diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 00afc064f60..d7c456dd594 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -300,10 +300,27 @@ public class IndexNameExpressionResolver { if (request.indices() == null || (request.indices() != null && request.indices().length != 1)) { throw new IllegalArgumentException("indices request must specify a single index expression"); } - Context context = new Context(state, request.indicesOptions(), false, true); - Index[] indices = concreteIndices(context, request.indices()[0]); + return concreteWriteIndex(state, request.indicesOptions(), request.indices()[0], false); + } + + /** + * Utility method that allows to resolve an index expression to its corresponding single write index. + * + * @param state the cluster state containing all the data to resolve to expression to a concrete index + * @param options defines how the aliases or indices need to be resolved to concrete indices + * @param index index that can be resolved to alias or index name. + * @param allowNoIndices whether to allow resolve to no index + * @throws IllegalArgumentException if the index resolution does not lead to an index, or leads to more than one index + * @return the write index obtained as a result of the index resolution or null if no index + */ + public Index concreteWriteIndex(ClusterState state, IndicesOptions options, String index, boolean allowNoIndices) { + Context context = new Context(state, options, false, true); + Index[] indices = concreteIndices(context, index); + if (allowNoIndices && indices.length == 0) { + return null; + } if (indices.length != 1) { - throw new IllegalArgumentException("The index expression [" + request.indices()[0] + + throw new IllegalArgumentException("The index expression [" + index + "] and options provided did not point to a single write-index"); } return indices[0]; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 5de1186767f..fc7d44c1916 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -19,6 +19,7 @@ import org.elasticsearch.protocol.xpack.license.LicenseStatus; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.core.action.XPackInfoAction; +import java.util.Collection; import java.util.EnumSet; import java.util.Iterator; import java.util.List; @@ -229,7 +230,7 @@ public final class RemoteClusterLicenseChecker { * @param indices the collection of index names * @return true if the collection of index names contains a name that represents a remote index, otherwise false */ - public static boolean containsRemoteIndex(final List indices) { + public static boolean containsRemoteIndex(final Collection indices) { return indices.stream().anyMatch(RemoteClusterLicenseChecker::isRemoteIndex); } @@ -240,7 +241,7 @@ public final class RemoteClusterLicenseChecker { * @param indices the collection of index names * @return list of index names that represent remote index names */ - public static List remoteIndices(final List indices) { + public static List remoteIndices(final Collection indices) { return indices.stream().filter(RemoteClusterLicenseChecker::isRemoteIndex).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java index 9b27dc0e71d..4a532ccbf67 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -611,6 +611,11 @@ public class XPackLicenseState { return status.active; } + public static boolean isTransformAllowedForOperationMode(final OperationMode operationMode) { + // any license (basic and upwards) + return operationMode != License.OperationMode.MISSING; + } + /** * Rollup is always available as long as there is a valid license * diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java new file mode 100644 index 00000000000..ef28fed54cf --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java @@ -0,0 +1,460 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.common.validation; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.ValidationException; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.license.RemoteClusterLicenseChecker; +import org.elasticsearch.protocol.xpack.license.LicenseStatus; +import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.transport.RemoteClusterService; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName; + +/** + * Validation of source indexes and destination index. + * + * Validations are separated into validators to choose from, e.g. you want to run different types of validations for + * preview/create/start with or without support for remote clusters + */ +public final class SourceDestValidator { + + // messages + public static final String SOURCE_INDEX_MISSING = "Source index [{0}] does not exist"; + public static final String SOURCE_LOWERCASE = "Source index [{0}] must be lowercase"; + public static final String DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]"; + public static final String DEST_LOWERCASE = "Destination index [{0}] must be lowercase"; + public static final String NEEDS_REMOTE_CLUSTER_SEARCH = "Source index is configured with a remote index pattern(s) [{0}]" + + " but the current node [{1}] is not allowed to connect to remote clusters." + + " Please enable cluster.remote.connect for all data nodes."; + public static final String ERROR_REMOTE_CLUSTER_SEARCH = "Error resolving remote source: {0}"; + public static final String UNKNOWN_REMOTE_CLUSTER_LICENSE = "Error during license check ({0}) for remote cluster " + + "alias(es) {1}, error: {2}"; + public static final String FEATURE_NOT_LICENSED_REMOTE_CLUSTER_LICENSE = "License check failed for remote cluster " + + "alias [{0}], at least a [{1}] license is required, found license [{2}]"; + public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster " + + "alias [{0}], license is not active"; + + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final RemoteClusterService remoteClusterService; + private final RemoteClusterLicenseChecker remoteClusterLicenseChecker; + private final String nodeName; + private final String license; + + /* + * Internal shared context between validators. + */ + static class Context { + private final ClusterState state; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final RemoteClusterService remoteClusterService; + private final RemoteClusterLicenseChecker remoteClusterLicenseChecker; + private final String[] source; + private final String dest; + private final String nodeName; + private final String license; + + private ValidationException validationException = null; + private SortedSet resolvedSource = null; + private SortedSet resolvedRemoteSource = null; + private String resolvedDest = null; + + Context( + final ClusterState state, + final IndexNameExpressionResolver indexNameExpressionResolver, + final RemoteClusterService remoteClusterService, + final RemoteClusterLicenseChecker remoteClusterLicenseChecker, + final String[] source, + final String dest, + final String nodeName, + final String license + ) { + this.state = state; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.remoteClusterService = remoteClusterService; + this.remoteClusterLicenseChecker = remoteClusterLicenseChecker; + this.source = source; + this.dest = dest; + this.nodeName = nodeName; + this.license = license; + } + + public ClusterState getState() { + return state; + } + + public RemoteClusterService getRemoteClusterService() { + return remoteClusterService; + } + + public RemoteClusterLicenseChecker getRemoteClusterLicenseChecker() { + return remoteClusterLicenseChecker; + } + + public IndexNameExpressionResolver getIndexNameExpressionResolver() { + return indexNameExpressionResolver; + } + + public boolean isRemoteSearchEnabled() { + return remoteClusterLicenseChecker != null; + } + + public String[] getSource() { + return source; + } + + public String getDest() { + return dest; + } + + public String getNodeName() { + return nodeName; + } + + public String getLicense() { + return license; + } + + public SortedSet resolveSource() { + if (resolvedSource == null) { + resolveLocalAndRemoteSource(); + } + + return resolvedSource; + } + + public SortedSet resolveRemoteSource() { + if (resolvedRemoteSource == null) { + resolveLocalAndRemoteSource(); + } + + return resolvedRemoteSource; + } + + public String resolveDest() { + if (resolvedDest == null) { + try { + Index singleWriteIndex = indexNameExpressionResolver.concreteWriteIndex( + state, + IndicesOptions.lenientExpandOpen(), + dest, + true + ); + + resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : dest; + } catch (IllegalArgumentException e) { + // stop here as we can not return a single dest index + addValidationError(e.getMessage()); + throw validationException; + } + } + + return resolvedDest; + } + + public ValidationException addValidationError(String error, Object... args) { + if (validationException == null) { + validationException = new ValidationException(); + } + + validationException.addValidationError(getMessage(error, args)); + + return validationException; + } + + public ValidationException getValidationException() { + return validationException; + } + + // convenience method to make testing easier + public Set getRegisteredRemoteClusterNames() { + return remoteClusterService.getRegisteredRemoteClusterNames(); + } + + private void resolveLocalAndRemoteSource() { + resolvedSource = new TreeSet<>(Arrays.asList(source)); + resolvedRemoteSource = new TreeSet<>(RemoteClusterLicenseChecker.remoteIndices(resolvedSource)); + resolvedSource.removeAll(resolvedRemoteSource); + + // special case: if indexNameExpressionResolver gets an empty list it treats it as _all + if (resolvedSource.isEmpty() == false) { + resolvedSource = new TreeSet<>( + Arrays.asList( + indexNameExpressionResolver.concreteIndexNames( + state, + DEFAULT_INDICES_OPTIONS_FOR_VALIDATION, + resolvedSource.toArray(new String[0]) + ) + ) + ); + } + } + } + + interface SourceDestValidation { + void validate(Context context, ActionListener listener); + } + + // note: this is equivalent to the default for search requests + private static final IndicesOptions DEFAULT_INDICES_OPTIONS_FOR_VALIDATION = IndicesOptions + .strictExpandOpenAndForbidClosedIgnoreThrottled(); + + public static final SourceDestValidation SOURCE_MISSING_VALIDATION = new SourceMissingValidation(); + public static final SourceDestValidation REMOTE_SOURCE_VALIDATION = new RemoteSourceEnabledAndRemoteLicenseValidation(); + public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation(); + public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation(); + + // set of default validation collections, if you want to automatically benefit from new validators, use those + public static final List PREVIEW_VALIDATIONS = Arrays.asList(SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION); + + public static final List ALL_VALIDATIONS = Arrays.asList( + SOURCE_MISSING_VALIDATION, + REMOTE_SOURCE_VALIDATION, + DESTINATION_IN_SOURCE_VALIDATION, + DESTINATION_SINGLE_INDEX_VALIDATION + ); + + public static final List NON_DEFERABLE_VALIDATIONS = Arrays.asList(DESTINATION_SINGLE_INDEX_VALIDATION); + + /** + * Create a new Source Dest Validator + * + * @param indexNameExpressionResolver A valid IndexNameExpressionResolver object + * @param remoteClusterService A valid RemoteClusterService object + * @param remoteClusterLicenseChecker A RemoteClusterLicenseChecker or null if CCS is disabled + * @param nodeName the name of this node + * @param license the license of the feature validated for + */ + public SourceDestValidator( + IndexNameExpressionResolver indexNameExpressionResolver, + RemoteClusterService remoteClusterService, + RemoteClusterLicenseChecker remoteClusterLicenseChecker, + String nodeName, + String license + ) { + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.remoteClusterService = remoteClusterService; + this.remoteClusterLicenseChecker = remoteClusterLicenseChecker; + this.nodeName = nodeName; + this.license = license; + } + + /** + * Run validation against source and dest. + * + * @param clusterState The current ClusterState + * @param source an array of source indexes + * @param dest destination index + * @param validations list of of validations to run + * @param listener result listener + */ + public void validate( + final ClusterState clusterState, + final String[] source, + final String dest, + final List validations, + final ActionListener listener + ) { + Context context = new Context( + clusterState, + indexNameExpressionResolver, + remoteClusterService, + remoteClusterLicenseChecker, + source, + dest, + nodeName, + license + ); + + ActionListener validationListener = ActionListener.wrap(c -> { + if (c.getValidationException() != null) { + listener.onFailure(c.getValidationException()); + } else { + listener.onResponse(true); + } + }, listener::onFailure); + + for (int i = validations.size() - 1; i >= 0; i--) { + final SourceDestValidation validation = validations.get(i); + final ActionListener previousValidationListener = validationListener; + validationListener = ActionListener.wrap(c -> { validation.validate(c, previousValidationListener); }, listener::onFailure); + } + + validationListener.onResponse(context); + } + + /** + * Validate dest request. + * + * This runs a couple of simple validations at request time, to be executed from a {@link ActionRequest}} + * implementation. + * + * Note: Source can not be validated at request time as it might contain expressions. + * + * @param validationException an ActionRequestValidationException for collection validation problem, can be null + * @param dest destination index, null if validation shall be skipped + */ + public static ActionRequestValidationException validateRequest( + @Nullable ActionRequestValidationException validationException, + @Nullable String dest + ) { + try { + if (dest != null) { + validateIndexOrAliasName(dest, InvalidIndexNameException::new); + if (dest.toLowerCase(Locale.ROOT).equals(dest) == false) { + validationException = addValidationError(getMessage(DEST_LOWERCASE, dest), validationException); + } + } + } catch (InvalidIndexNameException ex) { + validationException = addValidationError(ex.getMessage(), validationException); + } + + return validationException; + } + + static class SourceMissingValidation implements SourceDestValidation { + + @Override + public void validate(Context context, ActionListener listener) { + try { + // non-trivia: if source contains a wildcard index, which does not resolve to a concrete index + // the resolved indices might be empty, but we can check if source contained something, this works because + // of no wildcard index is involved the resolve would have thrown an exception + if (context.resolveSource().isEmpty() && context.resolveRemoteSource().isEmpty() && context.getSource().length == 0) { + context.addValidationError(SOURCE_INDEX_MISSING, Strings.arrayToCommaDelimitedString(context.getSource())); + } + } catch (IndexNotFoundException e) { + context.addValidationError(e.getMessage()); + } + listener.onResponse(context); + } + } + + static class RemoteSourceEnabledAndRemoteLicenseValidation implements SourceDestValidation { + @Override + public void validate(Context context, ActionListener listener) { + if (context.resolveRemoteSource().isEmpty()) { + listener.onResponse(context); + return; + } + + List remoteIndices = new ArrayList<>(context.resolveRemoteSource()); + // we can only check this node at the moment, clusters with mixed CCS enabled/disabled nodes are not supported, + // see gh#50033 + if (context.isRemoteSearchEnabled() == false) { + context.addValidationError(NEEDS_REMOTE_CLUSTER_SEARCH, context.resolveRemoteSource(), context.getNodeName()); + listener.onResponse(context); + return; + } + + // this can throw + List remoteAliases; + try { + remoteAliases = RemoteClusterLicenseChecker.remoteClusterAliases(context.getRegisteredRemoteClusterNames(), remoteIndices); + } catch (NoSuchRemoteClusterException e) { + context.addValidationError(e.getMessage()); + listener.onResponse(context); + return; + } catch (Exception e) { + context.addValidationError(ERROR_REMOTE_CLUSTER_SEARCH, e.getMessage()); + listener.onResponse(context); + return; + } + + context.getRemoteClusterLicenseChecker().checkRemoteClusterLicenses(remoteAliases, ActionListener.wrap(response -> { + if (response.isSuccess() == false) { + if (response.remoteClusterLicenseInfo().licenseInfo().getStatus() != LicenseStatus.ACTIVE) { + context.addValidationError(REMOTE_CLUSTER_LICENSE_INACTIVE, response.remoteClusterLicenseInfo().clusterAlias()); + } else { + context.addValidationError( + FEATURE_NOT_LICENSED_REMOTE_CLUSTER_LICENSE, + response.remoteClusterLicenseInfo().clusterAlias(), + context.getLicense(), + response.remoteClusterLicenseInfo().licenseInfo().getType() + ); + } + } + listener.onResponse(context); + }, e -> { + context.addValidationError(UNKNOWN_REMOTE_CLUSTER_LICENSE, context.getLicense(), remoteAliases, e.getMessage()); + listener.onResponse(context); + })); + } + } + + static class DestinationInSourceValidation implements SourceDestValidation { + + @Override + public void validate(Context context, ActionListener listener) { + final String destIndex = context.getDest(); + boolean foundSourceInDest = false; + + for (String src : context.getSource()) { + if (Regex.simpleMatch(src, destIndex)) { + context.addValidationError(DEST_IN_SOURCE, destIndex, src); + // do not return immediately but collect all errors and than return + foundSourceInDest = true; + } + } + + if (foundSourceInDest) { + listener.onResponse(context); + return; + } + + if (context.resolvedSource.contains(destIndex)) { + context.addValidationError(DEST_IN_SOURCE, destIndex, Strings.arrayToCommaDelimitedString(context.getSource())); + listener.onResponse(context); + return; + } + + if (context.resolvedSource.contains(context.resolveDest())) { + context.addValidationError( + DEST_IN_SOURCE, + context.resolveDest(), + Strings.collectionToCommaDelimitedString(context.resolveSource()) + ); + } + + listener.onResponse(context); + } + } + + static class DestinationSingleIndexValidation implements SourceDestValidation { + + @Override + public void validate(Context context, ActionListener listener) { + context.resolveDest(); + listener.onResponse(context); + } + } + + private static String getMessage(String message, Object... args) { + return new MessageFormat(message, Locale.ROOT).format(args); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index 9708cd301e4..b1e9dffea50 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -21,9 +21,6 @@ public class TransformMessages { "Failed to validate configuration"; public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration"; public static final String REST_PUT_TRANSFORM_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings"; - public static final String REST_PUT_TRANSFORM_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist"; - public static final String REST_PUT_TRANSFORM_DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]"; - public static final String REST_PUT_TRANSFORM_DEST_SINGLE_INDEX = "Destination index [{0}] should refer to a single index"; public static final String REST_PUT_TRANSFORM_INCONSISTENT_ID = "Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument"; public static final String TRANSFORM_CONFIG_INVALID = "Transform configuration is invalid [{0}]"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java index 9eabba779e4..b8f11d7fcbf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.DestConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -67,7 +68,7 @@ public class PreviewTransformAction extends ActionType destMap = (Map)providedDestination; + Map destMap = (Map) providedDestination; String pipeline = destMap.get(DestConfig.PIPELINE.getPreferredName()); if (pipeline != null) { tempDestination.put(DestConfig.PIPELINE.getPreferredName(), pipeline); @@ -75,12 +76,15 @@ public class PreviewTransformAction extends ActionType p.mapOrdered(), PREVIEW); PARSER.declareObject(Response::setMappings, (p, c) -> p.mapOrdered(), MAPPINGS); } + public Response() {} public Response(StreamInput in) throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java index 734d1c9b0b8..5199ca0601e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java @@ -15,18 +15,16 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.utils.TransformStrings; import java.io.IOException; -import java.util.Locale; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName; public class PutTransformAction extends ActionType { @@ -71,46 +69,46 @@ public class PutTransformAction extends ActionType { @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if(config.getPivotConfig() != null + if (config.getPivotConfig() != null && config.getPivotConfig().getMaxPageSearchSize() != null && (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) { validationException = addValidationError( - "pivot.max_page_search_size [" + - config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000", - validationException); + "pivot.max_page_search_size [" + + config.getPivotConfig().getMaxPageSearchSize() + + "] must be greater than 10 and less than 10,000", + validationException + ); } - for(String failure : config.getPivotConfig().aggFieldValidation()) { + for (String failure : config.getPivotConfig().aggFieldValidation()) { validationException = addValidationError(failure, validationException); } - String destIndex = config.getDestination().getIndex(); - try { - validateIndexOrAliasName(destIndex, InvalidIndexNameException::new); - if (!destIndex.toLowerCase(Locale.ROOT).equals(destIndex)) { - validationException = addValidationError("dest.index [" + destIndex +"] must be lowercase", validationException); - } - } catch (InvalidIndexNameException ex) { - validationException = addValidationError(ex.getMessage(), validationException); - } + + validationException = SourceDestValidator.validateRequest(validationException, config.getDestination().getIndex()); + if (TransformStrings.isValidId(config.getId()) == false) { validationException = addValidationError( TransformMessages.getMessage(TransformMessages.INVALID_ID, TransformField.ID.getPreferredName(), config.getId()), - validationException); + validationException + ); } if (TransformStrings.hasValidLengthForId(config.getId()) == false) { validationException = addValidationError( TransformMessages.getMessage(TransformMessages.ID_TOO_LONG, TransformStrings.ID_LENGTH_LIMIT), - validationException); + validationException + ); } TimeValue frequency = config.getFrequency(); if (frequency != null) { if (frequency.compareTo(MIN_FREQUENCY) < 0) { validationException = addValidationError( "minimum permitted [" + TransformField.FREQUENCY + "] is [" + MIN_FREQUENCY.getStringRep() + "]", - validationException); + validationException + ); } else if (frequency.compareTo(MAX_FREQUENCY) > 0) { validationException = addValidationError( "highest permitted [" + TransformField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]", - validationException); + validationException + ); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java index b8cc0294983..81fdad0df2b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/UpdateTransformAction.java @@ -16,17 +16,15 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; import java.io.IOException; -import java.util.Locale; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName; public class UpdateTransformAction extends ActionType { @@ -46,7 +44,7 @@ public class UpdateTransformAction extends ActionType 0) { validationException = addValidationError( "highest permitted [" + TransformField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]", - validationException); + validationException + ); } } @@ -131,9 +126,7 @@ public class UpdateTransformAction extends ActionType void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (request instanceof XPackInfoRequest) { + XPackInfoResponse response = new XPackInfoResponse( + null, + new LicenseInfo("uid", license, license, licenseStatus, randomNonNegativeLong()), + null + ); + listener.onResponse((Response) response); + return; + } + super.doExecute(action, request, listener); + } + } + + @Before + public void setupComponents() { + clientWithBasicLicense = new MockClientLicenseCheck(getTestName(), "basic", LicenseStatus.ACTIVE); + clientWithExpiredBasicLicense = new MockClientLicenseCheck(getTestName(), "basic", LicenseStatus.EXPIRED); + remoteClusterLicenseCheckerBasic = new RemoteClusterLicenseChecker( + clientWithBasicLicense, + (operationMode -> operationMode != License.OperationMode.MISSING) + ); + clientWithPlatinumLicense = new MockClientLicenseCheck(getTestName(), "platinum", LicenseStatus.ACTIVE); + clientWithTrialLicense = new MockClientLicenseCheck(getTestName(), "trial", LicenseStatus.ACTIVE); + } + + @After + public void closeComponents() throws Exception { + clientWithBasicLicense.close(); + clientWithExpiredBasicLicense.close(); + clientWithPlatinumLicense.close(); + clientWithTrialLicense.close(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + "dest", + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenNoSourceIndexAndValidDestIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] {}, + "dest", + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat(e.validationErrors().get(0), equalTo("Source index [] does not exist")); + } + ); + } + + public void testCheck_GivenMissingConcreteSourceIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "missing" }, + "dest", + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat(e.validationErrors().get(0), equalTo("no such index [missing]")); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "missing" }, + "dest", + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenMixedMissingAndExistingConcreteSourceIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1, "missing" }, + "dest", + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat(e.validationErrors().get(0), equalTo("no such index [missing]")); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1, "missing" }, + "dest", + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenMixedMissingWildcardExistingConcreteSourceIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1, "wildcard*", "missing" }, + "dest", + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat(e.validationErrors().get(0), equalTo("no such index [missing]")); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1, "wildcard*", "missing" }, + "dest", + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenWildcardSourceIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "wildcard*" }, + "dest", + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenDestIndexSameAsSourceIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + SOURCE_1, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo("Destination index [" + SOURCE_1 + "] is included in source expression [" + SOURCE_1 + "]") + ); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + SOURCE_1, + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenDestIndexMatchesSourceIndex() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "source-*" }, + SOURCE_2, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [source-*]") + ); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "source-*" }, + SOURCE_2, + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "source-1", "source-*" }, + SOURCE_2, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [source-*]") + ); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "source-1", "source-*" }, + SOURCE_2, + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_GivenDestIndexMatchesMultipleSourceIndices() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { "source-1", "source-*", "sou*" }, + SOURCE_2, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(2, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [source-*]") + ); + assertThat( + e.validationErrors().get(1), + equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [sou*]") + ); + } + ); + } + + public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + DEST_ALIAS, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo( + "no write index is defined for alias [dest-alias]. " + + "The write index may be explicitly disabled using is_write_index=false or the alias points " + + "to multiple indices without one being designated as a write index" + ) + ); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + DEST_ALIAS, + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo( + "no write index is defined for alias [dest-alias]. " + + "The write index may be explicitly disabled using is_write_index=false or the alias points " + + "to multiple indices without one being designated as a write index" + ) + ); + } + ); + } + + public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndicesButHasSingleWriteAlias() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + ALIAS_READ_WRITE_DEST, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo( + "no write index is defined for alias [" + + ALIAS_READ_WRITE_DEST + + "]. " + + "The write index may be explicitly disabled using is_write_index=false or the alias points " + + "to multiple indices without one being designated as a write index" + ) + ); + } + ); + } + + public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + SOURCE_1_ALIAS, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(1, e.validationErrors().size()); + assertThat( + e.validationErrors().get(0), + equalTo("Destination index [" + SOURCE_1 + "] is included in source expression [" + SOURCE_1 + "]") + ); + } + ); + + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1 }, + SOURCE_1_ALIAS, + SourceDestValidator.NON_DEFERABLE_VALIDATIONS, + listener + ), + true, + null + ); + } + + public void testCheck_MultipleValidationErrors() throws InterruptedException { + assertValidation( + listener -> simpleNonRemoteValidator.validate( + CLUSTER_STATE, + new String[] { SOURCE_1, "missing" }, + SOURCE_1_ALIAS, + SourceDestValidator.ALL_VALIDATIONS, + listener + ), + (Boolean) null, + e -> { + assertEquals(2, e.validationErrors().size()); + assertThat(e.validationErrors().get(0), equalTo("no such index [missing]")); + assertThat( + e.validationErrors().get(1), + equalTo("Destination index [" + SOURCE_1 + "] is included in source expression [missing,source-1]") + ); + } + ); + } + + // CCS tests: at time of writing it wasn't possible to mock RemoteClusterService, therefore it's not possible + // to test the whole validation but test RemoteSourceEnabledAndRemoteLicenseValidation + public void testRemoteSourceBasic() throws InterruptedException { + Context context = spy( + new SourceDestValidator.Context( + CLUSTER_STATE, + new IndexNameExpressionResolver(), + remoteClusterService, + remoteClusterLicenseCheckerBasic, + new String[] { REMOTE_BASIC + ":" + "SOURCE_1" }, + "dest", + "node_id", + "license" + ) + ); + + when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC)); + RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(); + + assertValidationWithContext( + listener -> validator.validate(context, listener), + c -> { assertNull(c.getValidationException()); }, + null + ); + } + + public void testRemoteSourcePlatinum() throws InterruptedException { + final Context context = spy( + new SourceDestValidator.Context( + CLUSTER_STATE, + new IndexNameExpressionResolver(), + remoteClusterService, + new RemoteClusterLicenseChecker(clientWithBasicLicense, XPackLicenseState::isPlatinumOrTrialOperationMode), + new String[] { REMOTE_BASIC + ":" + "SOURCE_1" }, + "dest", + "node_id", + "platinum" + ) + ); + + when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC)); + final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(); + + assertValidationWithContext(listener -> validator.validate(context, listener), c -> { + assertNotNull(c.getValidationException()); + assertEquals(1, c.getValidationException().validationErrors().size()); + assertThat( + c.getValidationException().validationErrors().get(0), + equalTo( + "License check failed for remote cluster alias [" + + REMOTE_BASIC + + "], at least a [platinum] license is required, found license [basic]" + ) + ); + }, null); + + final Context context2 = spy( + new SourceDestValidator.Context( + CLUSTER_STATE, + new IndexNameExpressionResolver(), + remoteClusterService, + new RemoteClusterLicenseChecker(clientWithPlatinumLicense, XPackLicenseState::isPlatinumOrTrialOperationMode), + new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" }, + "dest", + "node_id", + "license" + ) + ); + when(context2.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM)); + + assertValidationWithContext( + listener -> validator.validate(context2, listener), + c -> { assertNull(c.getValidationException()); }, + null + ); + + final Context context3 = spy( + new SourceDestValidator.Context( + CLUSTER_STATE, + new IndexNameExpressionResolver(), + remoteClusterService, + new RemoteClusterLicenseChecker(clientWithPlatinumLicense, XPackLicenseState::isPlatinumOrTrialOperationMode), + new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" }, + "dest", + "node_id", + "platinum" + ) + ); + when(context3.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM)); + + final RemoteSourceEnabledAndRemoteLicenseValidation validator3 = new RemoteSourceEnabledAndRemoteLicenseValidation(); + assertValidationWithContext( + listener -> validator3.validate(context3, listener), + c -> { assertNull(c.getValidationException()); }, + null + ); + + final Context context4 = spy( + new SourceDestValidator.Context( + CLUSTER_STATE, + new IndexNameExpressionResolver(), + remoteClusterService, + new RemoteClusterLicenseChecker(clientWithTrialLicense, XPackLicenseState::isPlatinumOrTrialOperationMode), + new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" }, + "dest", + "node_id", + "trial" + ) + ); + when(context4.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM)); + + final RemoteSourceEnabledAndRemoteLicenseValidation validator4 = new RemoteSourceEnabledAndRemoteLicenseValidation(); + assertValidationWithContext( + listener -> validator4.validate(context4, listener), + c -> { assertNull(c.getValidationException()); }, + null + ); + } + + public void testRemoteSourceLicenseInActive() throws InterruptedException { + final Context context = spy( + new SourceDestValidator.Context( + CLUSTER_STATE, + new IndexNameExpressionResolver(), + remoteClusterService, + new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, XPackLicenseState::isPlatinumOrTrialOperationMode), + new String[] { REMOTE_BASIC + ":" + "SOURCE_1" }, + "dest", + "node_id", + "license" + ) + ); + + when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC)); + final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(); + assertValidationWithContext(listener -> validator.validate(context, listener), c -> { + assertNotNull(c.getValidationException()); + assertEquals(1, c.getValidationException().validationErrors().size()); + assertThat( + c.getValidationException().validationErrors().get(0), + equalTo("License check failed for remote cluster alias [" + REMOTE_BASIC + "], license is not active") + ); + }, null); + } + + public void testRemoteSourceDoesNotExist() throws InterruptedException { + Context context = spy( + new SourceDestValidator.Context( + CLUSTER_STATE, + new IndexNameExpressionResolver(), + remoteClusterService, + new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, XPackLicenseState::isPlatinumOrTrialOperationMode), + new String[] { "non_existing_remote:" + "SOURCE_1" }, + "dest", + "node_id", + "license" + ) + ); + + when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC)); + RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(); + + assertValidationWithContext(listener -> validator.validate(context, listener), c -> { + assertNotNull(c.getValidationException()); + assertEquals(1, c.getValidationException().validationErrors().size()); + assertThat(c.getValidationException().validationErrors().get(0), equalTo("no such remote cluster: [non_existing_remote]")); + }, null); + } + + public void testRequestValidation() { + ActionRequestValidationException validationException = SourceDestValidator.validateRequest(null, "UPPERCASE"); + assertNotNull(validationException); + assertEquals(1, validationException.validationErrors().size()); + assertThat(validationException.validationErrors().get(0), equalTo("Destination index [UPPERCASE] must be lowercase")); + + validationException = SourceDestValidator.validateRequest(null, "remote:dest"); + assertNotNull(validationException); + assertEquals(1, validationException.validationErrors().size()); + assertThat(validationException.validationErrors().get(0), equalTo("Invalid index name [remote:dest], must not contain ':'")); + + validationException = SourceDestValidator.validateRequest(null, "dest"); + assertNull(validationException); + + validationException = new ActionRequestValidationException(); + validationException.addValidationError("error1"); + validationException.addValidationError("error2"); + validationException = SourceDestValidator.validateRequest(validationException, "dest"); + assertNotNull(validationException); + assertEquals(2, validationException.validationErrors().size()); + assertEquals(validationException.validationErrors().get(0), "error1"); + assertEquals(validationException.validationErrors().get(1), "error2"); + + validationException = SourceDestValidator.validateRequest(validationException, "UPPERCASE"); + assertNotNull(validationException); + assertEquals(3, validationException.validationErrors().size()); + assertThat(validationException.validationErrors().get(2), equalTo("Destination index [UPPERCASE] must be lowercase")); + } + + private void assertValidation(Consumer> function, T expected, Consumer onException) + throws InterruptedException { + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + if (expected == null) { + fail("expected an exception but got a response"); + } else { + assertThat(r, equalTo(expected)); + } + }, e -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + if (onException == null) { + logger.error("got unexpected exception", e); + fail("got unexpected exception: " + e.getMessage()); + } else if (e instanceof ValidationException) { + onException.accept((ValidationException) e); + } else { + fail("got unexpected exception type: " + e); + } + }), latch); + + function.accept(listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } + + private void assertValidationWithContext( + Consumer> function, + CheckedConsumer onAnswer, + Consumer onException + ) throws InterruptedException { + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + if (onAnswer == null) { + fail("expected an exception but got a response"); + } else { + onAnswer.accept(r); + } + }, e -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + if (onException == null) { + logger.error("got unexpected exception", e); + fail("got unexpected exception: " + e.getMessage()); + } else if (e instanceof ValidationException) { + onException.accept((ValidationException) e); + } else { + fail("got unexpected exception type: " + e); + } + }), latch); + + function.accept(listener); + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/preview_transforms.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/preview_transforms.yml index 35289c2bbd0..cf139b04b44 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/preview_transforms.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/preview_transforms.yml @@ -166,7 +166,7 @@ setup: --- "Test preview with non-existing source index": - do: - catch: /Source index \[does_not_exist\] does not exist/ + catch: /.*reason=Validation Failed.* no such index \[does_not_exist\]/ transform.preview_transform: body: > { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_crud.yml index fd9f75735e2..1968e1fa431 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_crud.yml @@ -79,7 +79,7 @@ setup: --- "Test put transform with invalid source index": - do: - catch: /Source index \[missing-index\] does not exist/ + catch: /.*reason=Validation Failed.* no such index \[missing-index\]/ transform.put_transform: transform_id: "missing-source-transform" body: > @@ -384,7 +384,7 @@ setup: name: source-index - do: - catch: /Destination index \[created-destination-index\] is included in source expression \[airline-data,created-destination-index\]/ + catch: /.*reason=Validation Failed.* Destination index \[created-destination-index\] is included in source expression \[airline-data,created-destination-index\]/ transform.put_transform: transform_id: "transform-from-aliases-failures" body: > @@ -410,7 +410,7 @@ setup: name: dest-index - do: - catch: /Destination index \[dest-index\] should refer to a single index/ + catch: /.*reason=Validation Failed.* no write index is defined for alias [dest2-index].*/ transform.put_transform: transform_id: "airline-transform" body: > @@ -521,7 +521,7 @@ setup: --- "Test invalid destination index name": - do: - catch: /dest\.index \[DeStInAtIoN\] must be lowercase/ + catch: /.*reason=Validation Failed.* Destination index \[DeStInAtIoN\] must be lowercase/ transform.put_transform: transform_id: "airline-transform" body: > diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_update.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_update.yml index 5b054a27fa3..f55fcd2cb07 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_update.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_update.yml @@ -67,7 +67,7 @@ setup: --- "Test put transform with invalid source index": - do: - catch: /Source index \[missing-index\] does not exist/ + catch: /.*reason=Validation Failed.* no such index \[missing-index\]/ transform.update_transform: transform_id: "updating-airline-transform" body: > @@ -255,7 +255,7 @@ setup: name: source2-index - do: - catch: /Destination index \[created-destination-index\] is included in source expression \[created-destination-index\]/ + catch: /.*reason=Validation Failed.* Destination index \[created-destination-index\] is included in source expression \[created-destination-index\]/ transform.update_transform: transform_id: "updating-airline-transform" body: > @@ -280,7 +280,7 @@ setup: index: created-destination-index name: dest2-index - do: - catch: /Destination index \[dest2-index\] should refer to a single index/ + catch: /.*reason=Validation Failed.* no write index is defined for alias [dest2-index].*/ transform.update_transform: transform_id: "updating-airline-transform" body: > @@ -290,7 +290,7 @@ setup: --- "Test invalid destination index name": - do: - catch: /dest\.index \[DeStInAtIoN\] must be lowercase/ + catch: /.*reason=Validation Failed.* Destination index \[DeStInAtIoN\] must be lowercase/ transform.update_transform: transform_id: "updating-airline-transform" body: > @@ -298,7 +298,7 @@ setup: "dest": { "index": "DeStInAtIoN" } } - do: - catch: /Invalid index name \[destination#dest\], must not contain \'#\'/ + catch: /.*reason=Validation Failed.* Invalid index name \[destination#dest\], must not contain \'#\'/ transform.update_transform: transform_id: "updating-airline-transform" body: > diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index a5615391aa9..583904044e8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -17,29 +17,33 @@ import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.license.License; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; @@ -66,8 +70,8 @@ public class TransportPreviewTransformAction extends private final XPackLicenseState licenseState; private final Client client; private final ThreadPool threadPool; - private final IndexNameExpressionResolver indexNameExpressionResolver; private final ClusterService clusterService; + private final SourceDestValidator sourceDestValidator; @Inject public TransportPreviewTransformAction( @@ -77,7 +81,8 @@ public class TransportPreviewTransformAction extends ThreadPool threadPool, XPackLicenseState licenseState, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService + ClusterService clusterService, + Settings settings ) { this( PreviewTransformAction.NAME, @@ -87,7 +92,8 @@ public class TransportPreviewTransformAction extends threadPool, licenseState, indexNameExpressionResolver, - clusterService + clusterService, + settings ); } @@ -99,14 +105,23 @@ public class TransportPreviewTransformAction extends ThreadPool threadPool, XPackLicenseState licenseState, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService + ClusterService clusterService, + Settings settings ) { super(name, transportService, actionFilters, PreviewTransformAction.Request::new); this.licenseState = licenseState; this.client = client; this.threadPool = threadPool; this.clusterService = clusterService; - this.indexNameExpressionResolver = indexNameExpressionResolver; + this.sourceDestValidator = new SourceDestValidator( + indexNameExpressionResolver, + transportService.getRemoteClusterService(), + RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings) + ? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode) + : null, + clusterService.getNodeName(), + License.OperationMode.BASIC.description() + ); } @Override @@ -119,39 +134,41 @@ public class TransportPreviewTransformAction extends ClusterState clusterState = clusterService.state(); final TransformConfig config = request.getConfig(); - for (String src : config.getSource().getIndex()) { - String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); - if (concreteNames.length == 0) { - listener.onFailure( - new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_SOURCE_INDEX_MISSING, src), - RestStatus.BAD_REQUEST - ) - ); - return; - } - } - Pivot pivot = new Pivot(config.getPivotConfig()); - try { - pivot.validateConfig(); - } catch (ElasticsearchStatusException e) { - listener.onFailure( - new ElasticsearchStatusException(TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, e.status(), e) - ); - return; - } catch (Exception e) { - listener.onFailure( - new ElasticsearchStatusException( - TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, - RestStatus.INTERNAL_SERVER_ERROR, - e - ) - ); - return; - } + sourceDestValidator.validate( + clusterState, + config.getSource().getIndex(), + config.getDestination().getIndex(), + SourceDestValidator.PREVIEW_VALIDATIONS, + ActionListener.wrap(r -> { - getPreview(pivot, config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), listener); + Pivot pivot = new Pivot(config.getPivotConfig()); + try { + pivot.validateConfig(); + } catch (ElasticsearchStatusException e) { + listener.onFailure( + new ElasticsearchStatusException( + TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, + e.status(), + e + ) + ); + return; + } catch (Exception e) { + listener.onFailure( + new ElasticsearchStatusException( + TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, + RestStatus.INTERNAL_SERVER_ERROR, + e + ) + ); + return; + } + + getPreview(pivot, config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), listener); + + }, listener::onFailure) + ); } @SuppressWarnings("unchecked") @@ -216,10 +233,7 @@ public class TransportPreviewTransformAction extends builder.startObject(); builder.field("docs", results); builder.endObject(); - SimulatePipelineRequest pipelineRequest = new SimulatePipelineRequest( - BytesReference.bytes(builder), - XContentType.JSON - ); + SimulatePipelineRequest pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON); pipelineRequest.setId(pipeline); ClientHelper.executeAsyncWithOrigin( client, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 7b9c1040480..505d4ffe5fd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -26,16 +26,20 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.License; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -50,7 +54,6 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; -import org.elasticsearch.xpack.transform.transforms.SourceDestValidator; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; import java.io.IOException; @@ -69,6 +72,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction privResponseListener = ActionListener.wrap( - r -> handlePrivsResponse(username, request, r, listener), + sourceDestValidator.validate( + clusterState, + config.getSource().getIndex(), + config.getDestination().getIndex(), + request.isDeferValidation() ? SourceDestValidator.NON_DEFERABLE_VALIDATIONS : SourceDestValidator.ALL_VALIDATIONS, + ActionListener.wrap( + validationResponse -> { + // Early check to verify that the user can create the destination index and can read from the source + if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) { + final String username = securityContext.getUser().principal(); + HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username); + ActionListener privResponseListener = ActionListener.wrap( + r -> handlePrivsResponse(username, request, r, listener), + listener::onFailure + ); + + client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); + } else { // No security enabled, just create the transform + putTransform(request, listener); + } + }, listener::onFailure - ); - - client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); - } else { // No security enabled, just create the transform - putTransform(request, listener); - } + ) + ); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index 8441333b743..cc55a3705ca 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -24,16 +24,21 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.license.License; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StartTransformAction; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -44,7 +49,6 @@ import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformIndex; -import org.elasticsearch.xpack.transform.transforms.SourceDestValidator; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; import java.io.IOException; @@ -65,6 +69,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction transformTaskHolder = new AtomicReference<>(); + final AtomicReference transformConfigHolder = new AtomicReference<>(); - // <4> Wait for the allocated task's state to STARTED + // <5> Wait for the allocated task's state to STARTED ActionListener> newPersistentTaskActionListener = ActionListener .wrap(task -> { TransformTaskParams transformTask = transformTaskHolder.get(); @@ -155,7 +173,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction Create the task in cluster state so that it will start executing on the node + // <4> Create the task in cluster state so that it will start executing on the node ActionListener createOrGetIndexListener = ActionListener.wrap(unused -> { TransformTaskParams transformTask = transformTaskHolder.get(); assert transformTask != null; @@ -192,26 +210,13 @@ public class TransportStartTransformAction extends TransportMasterNodeAction If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it - ActionListener getTransformListener = ActionListener.wrap(config -> { - if (config.isValid() == false) { - listener.onFailure( - new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.TRANSFORM_CONFIG_INVALID, request.getId()), - RestStatus.BAD_REQUEST - ) - ); - return; - } - // Validate source and destination indices - SourceDestValidator.validate(config, clusterService.state(), indexNameExpressionResolver, false); - - transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency())); - final String destinationIndex = config.getDestination().getIndex(); + ActionListener validationListener = ActionListener.wrap(validationResponse -> { + final String destinationIndex = transformConfigHolder.get().getDestination().getIndex(); String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), destinationIndex); if (dest.length == 0) { auditor.info(request.getId(), "Creating destination index [" + destinationIndex + "] with deduced mappings."); - createDestinationIndex(config, createOrGetIndexListener); + createDestinationIndex(transformConfigHolder.get(), createOrGetIndexListener); } else { auditor.info(request.getId(), "Using existing destination index [" + destinationIndex + "]."); ClientHelper.executeAsyncWithOrigin( @@ -238,6 +243,29 @@ public class TransportStartTransformAction extends TransportMasterNodeAction run transform validations + ActionListener getTransformListener = ActionListener.wrap(config -> { + if (config.isValid() == false) { + listener.onFailure( + new ElasticsearchStatusException( + TransformMessages.getMessage(TransformMessages.TRANSFORM_CONFIG_INVALID, request.getId()), + RestStatus.BAD_REQUEST + ) + ); + return; + } + transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency())); + transformConfigHolder.set(config); + + sourceDestValidator.validate( + clusterService.state(), + config.getSource().getIndex(), + config.getDestination().getIndex(), + SourceDestValidator.ALL_VALIDATIONS, + validationListener + ); + }, listener::onFailure); + // <1> Get the config to verify it exists and is valid transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 802eac68e86..8fce7081aed 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -23,16 +23,20 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.License; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -50,7 +54,6 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformIndex; -import org.elasticsearch.xpack.transform.transforms.SourceDestValidator; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; import java.io.IOException; @@ -69,6 +72,7 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction { + checkPriviledgesAndUpdateTransform(request, clusterState, updatedConfig, configAndVersion.v2(), listener); + }, + listener::onFailure + ) + ); + }, listener::onFailure)); } @@ -196,20 +221,13 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction listener ) { - try { - SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation()); - } catch (ElasticsearchStatusException ex) { - listener.onFailure(ex); - return; - } - // Early check to verify that the user can create the destination index and can read from the source if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) { final String username = securityContext.getUser().principal(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPreviewTransformActionDeprecated.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPreviewTransformActionDeprecated.java index 8882fcf505b..d1ad8ac09f7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPreviewTransformActionDeprecated.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPreviewTransformActionDeprecated.java @@ -11,6 +11,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -20,12 +21,27 @@ import org.elasticsearch.xpack.transform.action.TransportPreviewTransformAction; public class TransportPreviewTransformActionDeprecated extends TransportPreviewTransformAction { @Inject - public TransportPreviewTransformActionDeprecated(TransportService transportService, ActionFilters actionFilters, - Client client, ThreadPool threadPool, XPackLicenseState licenseState, - IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService) { - super(PreviewTransformActionDeprecated.NAME, transportService, actionFilters, client, threadPool, licenseState, - indexNameExpressionResolver, clusterService); + public TransportPreviewTransformActionDeprecated( + TransportService transportService, + ActionFilters actionFilters, + Client client, + ThreadPool threadPool, + XPackLicenseState licenseState, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService, + Settings settings + ) { + super( + PreviewTransformActionDeprecated.NAME, + transportService, + actionFilters, + client, + threadPool, + licenseState, + indexNameExpressionResolver, + clusterService, + settings + ); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportStartTransformActionDeprecated.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportStartTransformActionDeprecated.java index 17a996f760e..caef1b1edf1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportStartTransformActionDeprecated.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportStartTransformActionDeprecated.java @@ -11,6 +11,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.threadpool.ThreadPool; @@ -31,7 +32,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans IndexNameExpressionResolver indexNameExpressionResolver, TransformServices transformServices, PersistentTasksService persistentTasksService, - Client client + Client client, + Settings settings ) { super( StartTransformActionDeprecated.NAME, @@ -43,7 +45,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans indexNameExpressionResolver, transformServices, persistentTasksService, - client + client, + settings ); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/SourceDestValidator.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/SourceDestValidator.java deleted file mode 100644 index 8c89ffd6d5f..00000000000 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/SourceDestValidator.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.transform.transforms; - -import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.xpack.core.transform.TransformMessages; -import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * This class contains more complex validations in regards to how {@link TransformConfig#getSource()} and - * {@link TransformConfig#getDestination()} relate to each other. - */ -public final class SourceDestValidator { - - interface SourceDestValidation { - boolean isDeferrable(); - void validate(TransformConfig config, ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver); - } - - private static final List VALIDATIONS = Arrays.asList(new SourceMissingValidation(), - new DestinationInSourceValidation(), - new DestinationSingleIndexValidation()); - - /** - * Validates the DataFrameTransformConfiguration source and destination indices. - * - * A simple name validation is done on {@link TransformConfig#getDestination()} inside - * {@link org.elasticsearch.xpack.core.transform.action.PutTransformAction} - * - * So, no need to do the name checks here. - * - * @param config DataFrameTransformConfig to validate - * @param clusterState The current ClusterState - * @param indexNameExpressionResolver A valid IndexNameExpressionResolver object - * @throws ElasticsearchStatusException when a validation fails - */ - public static void validate(TransformConfig config, - ClusterState clusterState, - IndexNameExpressionResolver indexNameExpressionResolver, - boolean shouldDefer) { - for (SourceDestValidation validation : VALIDATIONS) { - if (shouldDefer && validation.isDeferrable()) { - continue; - } - validation.validate(config, clusterState, indexNameExpressionResolver); - } - } - - static class SourceMissingValidation implements SourceDestValidation { - - @Override - public boolean isDeferrable() { - return true; - } - - @Override - public void validate(TransformConfig config, - ClusterState clusterState, - IndexNameExpressionResolver indexNameExpressionResolver) { - for(String src : config.getSource().getIndex()) { - String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, - IndicesOptions.lenientExpandOpen(), - src); - if (concreteNames.length == 0) { - throw new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_SOURCE_INDEX_MISSING, src), - RestStatus.BAD_REQUEST); - } - } - } - } - - static class DestinationInSourceValidation implements SourceDestValidation { - - @Override - public boolean isDeferrable() { - return true; - } - - @Override - public void validate(TransformConfig config, - ClusterState clusterState, - IndexNameExpressionResolver indexNameExpressionResolver) { - final String destIndex = config.getDestination().getIndex(); - Set concreteSourceIndexNames = new HashSet<>(); - for(String src : config.getSource().getIndex()) { - String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, - IndicesOptions.lenientExpandOpen(), - src); - if (Regex.simpleMatch(src, destIndex)) { - throw new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_IN_SOURCE, destIndex, src), - RestStatus.BAD_REQUEST); - } - concreteSourceIndexNames.addAll(Arrays.asList(concreteNames)); - } - - if (concreteSourceIndexNames.contains(destIndex)) { - throw new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_IN_SOURCE, - destIndex, - Strings.arrayToCommaDelimitedString(config.getSource().getIndex())), - RestStatus.BAD_REQUEST - ); - } - - final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState, - IndicesOptions.lenientExpandOpen(), - destIndex); - if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) { - throw new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_IN_SOURCE, - concreteDest[0], - Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))), - RestStatus.BAD_REQUEST - ); - } - } - } - - static class DestinationSingleIndexValidation implements SourceDestValidation { - - @Override - public boolean isDeferrable() { - return false; - } - - @Override - public void validate(TransformConfig config, - ClusterState clusterState, - IndexNameExpressionResolver indexNameExpressionResolver) { - final String destIndex = config.getDestination().getIndex(); - final String[] concreteDest = - indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex); - - if (concreteDest.length > 1) { - throw new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_SINGLE_INDEX, destIndex), - RestStatus.BAD_REQUEST - ); - } - } - } -} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/SourceDestValidatorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/SourceDestValidatorTests.java deleted file mode 100644 index add6a0c27b9..00000000000 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/SourceDestValidatorTests.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.transform.transforms; - -import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; -import org.elasticsearch.xpack.core.transform.transforms.DestConfig; -import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; -import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests; - -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; -import static org.hamcrest.Matchers.equalTo; - -public class SourceDestValidatorTests extends ESTestCase { - - private static final String SOURCE_1 = "source-1"; - private static final String SOURCE_2 = "source-2"; - private static final String ALIASED_DEST = "aliased-dest"; - - private static final ClusterState CLUSTER_STATE; - - static { - IndexMetaData source1 = IndexMetaData.builder(SOURCE_1).settings(Settings.builder() - .put(SETTING_VERSION_CREATED, Version.CURRENT) - .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(SETTING_CREATION_DATE, System.currentTimeMillis())) - .putAlias(AliasMetaData.builder("source-1-alias").build()) - .build(); - IndexMetaData source2 = IndexMetaData.builder(SOURCE_2).settings(Settings.builder() - .put(SETTING_VERSION_CREATED, Version.CURRENT) - .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(SETTING_CREATION_DATE, System.currentTimeMillis())) - .putAlias(AliasMetaData.builder("dest-alias").build()) - .build(); - IndexMetaData aliasedDest = IndexMetaData.builder(ALIASED_DEST).settings(Settings.builder() - .put(SETTING_VERSION_CREATED, Version.CURRENT) - .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(SETTING_CREATION_DATE, System.currentTimeMillis())) - .putAlias(AliasMetaData.builder("dest-alias").build()) - .build(); - ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); - state.metaData(MetaData.builder() - .put(IndexMetaData.builder(source1).build(), false) - .put(IndexMetaData.builder(source2).build(), false) - .put(IndexMetaData.builder(aliasedDest).build(), false)); - CLUSTER_STATE = state.build(); - } - - public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() { - TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null)); - SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false); - } - - public void testCheck_GivenMissingConcreteSourceIndex() { - TransformConfig config = createTransform(new SourceConfig("missing"), new DestConfig("dest", null)); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("Source index [missing] does not exist")); - SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); - } - - public void testCheck_GivenMissingWildcardSourceIndex() { - TransformConfig config = createTransform(new SourceConfig("missing*"), new DestConfig("dest", null)); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist")); - SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); - } - - public void testCheck_GivenDestIndexSameAsSourceIndex() { - TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null)); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]")); - SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); - } - - public void testCheck_GivenDestIndexMatchesSourceIndex() { - TransformConfig config = createTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null)); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); - SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); - } - - public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() { - TransformConfig config = createTransform(new SourceConfig("source-1", "source-*"), - new DestConfig(SOURCE_2, null)); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); - SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); - } - - public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() { - TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null)); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), - equalTo("Destination index [dest-alias] should refer to a single index")); - - e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), - equalTo("Destination index [dest-alias] should refer to a single index")); - } - - public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() { - TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null)); - - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), - equalTo("Destination index [source-1] is included in source expression [source-1]")); - - SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); - } - - private static TransformConfig createTransform(SourceConfig sourceConfig, DestConfig destConfig) { - return new TransformConfig("test", - sourceConfig, - destConfig, - TimeValue.timeValueSeconds(60), - null, - null, - PivotConfigTests.randomPivotConfig(), - null); - } -}