[Transform] refactor source and dest validation to support CCS (#50018)
refactors source and dest validation, adds support for CCS, makes resolve work like reindex/search, allow aliased dest index with a single write index. fixes #49988 fixes #49851 relates #43201
This commit is contained in:
parent
3cdc23ec9c
commit
de14092ad2
|
@ -300,10 +300,27 @@ public class IndexNameExpressionResolver {
|
|||
if (request.indices() == null || (request.indices() != null && request.indices().length != 1)) {
|
||||
throw new IllegalArgumentException("indices request must specify a single index expression");
|
||||
}
|
||||
Context context = new Context(state, request.indicesOptions(), false, true);
|
||||
Index[] indices = concreteIndices(context, request.indices()[0]);
|
||||
return concreteWriteIndex(state, request.indicesOptions(), request.indices()[0], false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method that allows to resolve an index expression to its corresponding single write index.
|
||||
*
|
||||
* @param state the cluster state containing all the data to resolve to expression to a concrete index
|
||||
* @param options defines how the aliases or indices need to be resolved to concrete indices
|
||||
* @param index index that can be resolved to alias or index name.
|
||||
* @param allowNoIndices whether to allow resolve to no index
|
||||
* @throws IllegalArgumentException if the index resolution does not lead to an index, or leads to more than one index
|
||||
* @return the write index obtained as a result of the index resolution or null if no index
|
||||
*/
|
||||
public Index concreteWriteIndex(ClusterState state, IndicesOptions options, String index, boolean allowNoIndices) {
|
||||
Context context = new Context(state, options, false, true);
|
||||
Index[] indices = concreteIndices(context, index);
|
||||
if (allowNoIndices && indices.length == 0) {
|
||||
return null;
|
||||
}
|
||||
if (indices.length != 1) {
|
||||
throw new IllegalArgumentException("The index expression [" + request.indices()[0] +
|
||||
throw new IllegalArgumentException("The index expression [" + index +
|
||||
"] and options provided did not point to a single write-index");
|
||||
}
|
||||
return indices[0];
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.protocol.xpack.license.LicenseStatus;
|
|||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
import org.elasticsearch.xpack.core.action.XPackInfoAction;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -229,7 +230,7 @@ public final class RemoteClusterLicenseChecker {
|
|||
* @param indices the collection of index names
|
||||
* @return true if the collection of index names contains a name that represents a remote index, otherwise false
|
||||
*/
|
||||
public static boolean containsRemoteIndex(final List<String> indices) {
|
||||
public static boolean containsRemoteIndex(final Collection<String> indices) {
|
||||
return indices.stream().anyMatch(RemoteClusterLicenseChecker::isRemoteIndex);
|
||||
}
|
||||
|
||||
|
@ -240,7 +241,7 @@ public final class RemoteClusterLicenseChecker {
|
|||
* @param indices the collection of index names
|
||||
* @return list of index names that represent remote index names
|
||||
*/
|
||||
public static List<String> remoteIndices(final List<String> indices) {
|
||||
public static List<String> remoteIndices(final Collection<String> indices) {
|
||||
return indices.stream().filter(RemoteClusterLicenseChecker::isRemoteIndex).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
|
|
@ -611,6 +611,11 @@ public class XPackLicenseState {
|
|||
return status.active;
|
||||
}
|
||||
|
||||
public static boolean isTransformAllowedForOperationMode(final OperationMode operationMode) {
|
||||
// any license (basic and upwards)
|
||||
return operationMode != License.OperationMode.MISSING;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollup is always available as long as there is a valid license
|
||||
*
|
||||
|
|
|
@ -0,0 +1,460 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.common.validation;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.protocol.xpack.license.LicenseStatus;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName;
|
||||
|
||||
/**
|
||||
* Validation of source indexes and destination index.
|
||||
*
|
||||
* Validations are separated into validators to choose from, e.g. you want to run different types of validations for
|
||||
* preview/create/start with or without support for remote clusters
|
||||
*/
|
||||
public final class SourceDestValidator {
|
||||
|
||||
// messages
|
||||
public static final String SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
|
||||
public static final String SOURCE_LOWERCASE = "Source index [{0}] must be lowercase";
|
||||
public static final String DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]";
|
||||
public static final String DEST_LOWERCASE = "Destination index [{0}] must be lowercase";
|
||||
public static final String NEEDS_REMOTE_CLUSTER_SEARCH = "Source index is configured with a remote index pattern(s) [{0}]"
|
||||
+ " but the current node [{1}] is not allowed to connect to remote clusters."
|
||||
+ " Please enable cluster.remote.connect for all data nodes.";
|
||||
public static final String ERROR_REMOTE_CLUSTER_SEARCH = "Error resolving remote source: {0}";
|
||||
public static final String UNKNOWN_REMOTE_CLUSTER_LICENSE = "Error during license check ({0}) for remote cluster "
|
||||
+ "alias(es) {1}, error: {2}";
|
||||
public static final String FEATURE_NOT_LICENSED_REMOTE_CLUSTER_LICENSE = "License check failed for remote cluster "
|
||||
+ "alias [{0}], at least a [{1}] license is required, found license [{2}]";
|
||||
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
|
||||
+ "alias [{0}], license is not active";
|
||||
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private final RemoteClusterService remoteClusterService;
|
||||
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
|
||||
private final String nodeName;
|
||||
private final String license;
|
||||
|
||||
/*
|
||||
* Internal shared context between validators.
|
||||
*/
|
||||
static class Context {
|
||||
private final ClusterState state;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private final RemoteClusterService remoteClusterService;
|
||||
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
|
||||
private final String[] source;
|
||||
private final String dest;
|
||||
private final String nodeName;
|
||||
private final String license;
|
||||
|
||||
private ValidationException validationException = null;
|
||||
private SortedSet<String> resolvedSource = null;
|
||||
private SortedSet<String> resolvedRemoteSource = null;
|
||||
private String resolvedDest = null;
|
||||
|
||||
Context(
|
||||
final ClusterState state,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
final RemoteClusterService remoteClusterService,
|
||||
final RemoteClusterLicenseChecker remoteClusterLicenseChecker,
|
||||
final String[] source,
|
||||
final String dest,
|
||||
final String nodeName,
|
||||
final String license
|
||||
) {
|
||||
this.state = state;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.remoteClusterService = remoteClusterService;
|
||||
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
|
||||
this.source = source;
|
||||
this.dest = dest;
|
||||
this.nodeName = nodeName;
|
||||
this.license = license;
|
||||
}
|
||||
|
||||
public ClusterState getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public RemoteClusterService getRemoteClusterService() {
|
||||
return remoteClusterService;
|
||||
}
|
||||
|
||||
public RemoteClusterLicenseChecker getRemoteClusterLicenseChecker() {
|
||||
return remoteClusterLicenseChecker;
|
||||
}
|
||||
|
||||
public IndexNameExpressionResolver getIndexNameExpressionResolver() {
|
||||
return indexNameExpressionResolver;
|
||||
}
|
||||
|
||||
public boolean isRemoteSearchEnabled() {
|
||||
return remoteClusterLicenseChecker != null;
|
||||
}
|
||||
|
||||
public String[] getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public String getDest() {
|
||||
return dest;
|
||||
}
|
||||
|
||||
public String getNodeName() {
|
||||
return nodeName;
|
||||
}
|
||||
|
||||
public String getLicense() {
|
||||
return license;
|
||||
}
|
||||
|
||||
public SortedSet<String> resolveSource() {
|
||||
if (resolvedSource == null) {
|
||||
resolveLocalAndRemoteSource();
|
||||
}
|
||||
|
||||
return resolvedSource;
|
||||
}
|
||||
|
||||
public SortedSet<String> resolveRemoteSource() {
|
||||
if (resolvedRemoteSource == null) {
|
||||
resolveLocalAndRemoteSource();
|
||||
}
|
||||
|
||||
return resolvedRemoteSource;
|
||||
}
|
||||
|
||||
public String resolveDest() {
|
||||
if (resolvedDest == null) {
|
||||
try {
|
||||
Index singleWriteIndex = indexNameExpressionResolver.concreteWriteIndex(
|
||||
state,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
dest,
|
||||
true
|
||||
);
|
||||
|
||||
resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : dest;
|
||||
} catch (IllegalArgumentException e) {
|
||||
// stop here as we can not return a single dest index
|
||||
addValidationError(e.getMessage());
|
||||
throw validationException;
|
||||
}
|
||||
}
|
||||
|
||||
return resolvedDest;
|
||||
}
|
||||
|
||||
public ValidationException addValidationError(String error, Object... args) {
|
||||
if (validationException == null) {
|
||||
validationException = new ValidationException();
|
||||
}
|
||||
|
||||
validationException.addValidationError(getMessage(error, args));
|
||||
|
||||
return validationException;
|
||||
}
|
||||
|
||||
public ValidationException getValidationException() {
|
||||
return validationException;
|
||||
}
|
||||
|
||||
// convenience method to make testing easier
|
||||
public Set<String> getRegisteredRemoteClusterNames() {
|
||||
return remoteClusterService.getRegisteredRemoteClusterNames();
|
||||
}
|
||||
|
||||
private void resolveLocalAndRemoteSource() {
|
||||
resolvedSource = new TreeSet<>(Arrays.asList(source));
|
||||
resolvedRemoteSource = new TreeSet<>(RemoteClusterLicenseChecker.remoteIndices(resolvedSource));
|
||||
resolvedSource.removeAll(resolvedRemoteSource);
|
||||
|
||||
// special case: if indexNameExpressionResolver gets an empty list it treats it as _all
|
||||
if (resolvedSource.isEmpty() == false) {
|
||||
resolvedSource = new TreeSet<>(
|
||||
Arrays.asList(
|
||||
indexNameExpressionResolver.concreteIndexNames(
|
||||
state,
|
||||
DEFAULT_INDICES_OPTIONS_FOR_VALIDATION,
|
||||
resolvedSource.toArray(new String[0])
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface SourceDestValidation {
|
||||
void validate(Context context, ActionListener<Context> listener);
|
||||
}
|
||||
|
||||
// note: this is equivalent to the default for search requests
|
||||
private static final IndicesOptions DEFAULT_INDICES_OPTIONS_FOR_VALIDATION = IndicesOptions
|
||||
.strictExpandOpenAndForbidClosedIgnoreThrottled();
|
||||
|
||||
public static final SourceDestValidation SOURCE_MISSING_VALIDATION = new SourceMissingValidation();
|
||||
public static final SourceDestValidation REMOTE_SOURCE_VALIDATION = new RemoteSourceEnabledAndRemoteLicenseValidation();
|
||||
public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation();
|
||||
public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation();
|
||||
|
||||
// set of default validation collections, if you want to automatically benefit from new validators, use those
|
||||
public static final List<SourceDestValidation> PREVIEW_VALIDATIONS = Arrays.asList(SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION);
|
||||
|
||||
public static final List<SourceDestValidation> ALL_VALIDATIONS = Arrays.asList(
|
||||
SOURCE_MISSING_VALIDATION,
|
||||
REMOTE_SOURCE_VALIDATION,
|
||||
DESTINATION_IN_SOURCE_VALIDATION,
|
||||
DESTINATION_SINGLE_INDEX_VALIDATION
|
||||
);
|
||||
|
||||
public static final List<SourceDestValidation> NON_DEFERABLE_VALIDATIONS = Arrays.asList(DESTINATION_SINGLE_INDEX_VALIDATION);
|
||||
|
||||
/**
|
||||
* Create a new Source Dest Validator
|
||||
*
|
||||
* @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
|
||||
* @param remoteClusterService A valid RemoteClusterService object
|
||||
* @param remoteClusterLicenseChecker A RemoteClusterLicenseChecker or null if CCS is disabled
|
||||
* @param nodeName the name of this node
|
||||
* @param license the license of the feature validated for
|
||||
*/
|
||||
public SourceDestValidator(
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
RemoteClusterService remoteClusterService,
|
||||
RemoteClusterLicenseChecker remoteClusterLicenseChecker,
|
||||
String nodeName,
|
||||
String license
|
||||
) {
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.remoteClusterService = remoteClusterService;
|
||||
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
|
||||
this.nodeName = nodeName;
|
||||
this.license = license;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run validation against source and dest.
|
||||
*
|
||||
* @param clusterState The current ClusterState
|
||||
* @param source an array of source indexes
|
||||
* @param dest destination index
|
||||
* @param validations list of of validations to run
|
||||
* @param listener result listener
|
||||
*/
|
||||
public void validate(
|
||||
final ClusterState clusterState,
|
||||
final String[] source,
|
||||
final String dest,
|
||||
final List<SourceDestValidation> validations,
|
||||
final ActionListener<Boolean> listener
|
||||
) {
|
||||
Context context = new Context(
|
||||
clusterState,
|
||||
indexNameExpressionResolver,
|
||||
remoteClusterService,
|
||||
remoteClusterLicenseChecker,
|
||||
source,
|
||||
dest,
|
||||
nodeName,
|
||||
license
|
||||
);
|
||||
|
||||
ActionListener<Context> validationListener = ActionListener.wrap(c -> {
|
||||
if (c.getValidationException() != null) {
|
||||
listener.onFailure(c.getValidationException());
|
||||
} else {
|
||||
listener.onResponse(true);
|
||||
}
|
||||
}, listener::onFailure);
|
||||
|
||||
for (int i = validations.size() - 1; i >= 0; i--) {
|
||||
final SourceDestValidation validation = validations.get(i);
|
||||
final ActionListener<Context> previousValidationListener = validationListener;
|
||||
validationListener = ActionListener.wrap(c -> { validation.validate(c, previousValidationListener); }, listener::onFailure);
|
||||
}
|
||||
|
||||
validationListener.onResponse(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate dest request.
|
||||
*
|
||||
* This runs a couple of simple validations at request time, to be executed from a {@link ActionRequest}}
|
||||
* implementation.
|
||||
*
|
||||
* Note: Source can not be validated at request time as it might contain expressions.
|
||||
*
|
||||
* @param validationException an ActionRequestValidationException for collection validation problem, can be null
|
||||
* @param dest destination index, null if validation shall be skipped
|
||||
*/
|
||||
public static ActionRequestValidationException validateRequest(
|
||||
@Nullable ActionRequestValidationException validationException,
|
||||
@Nullable String dest
|
||||
) {
|
||||
try {
|
||||
if (dest != null) {
|
||||
validateIndexOrAliasName(dest, InvalidIndexNameException::new);
|
||||
if (dest.toLowerCase(Locale.ROOT).equals(dest) == false) {
|
||||
validationException = addValidationError(getMessage(DEST_LOWERCASE, dest), validationException);
|
||||
}
|
||||
}
|
||||
} catch (InvalidIndexNameException ex) {
|
||||
validationException = addValidationError(ex.getMessage(), validationException);
|
||||
}
|
||||
|
||||
return validationException;
|
||||
}
|
||||
|
||||
static class SourceMissingValidation implements SourceDestValidation {
|
||||
|
||||
@Override
|
||||
public void validate(Context context, ActionListener<Context> listener) {
|
||||
try {
|
||||
// non-trivia: if source contains a wildcard index, which does not resolve to a concrete index
|
||||
// the resolved indices might be empty, but we can check if source contained something, this works because
|
||||
// of no wildcard index is involved the resolve would have thrown an exception
|
||||
if (context.resolveSource().isEmpty() && context.resolveRemoteSource().isEmpty() && context.getSource().length == 0) {
|
||||
context.addValidationError(SOURCE_INDEX_MISSING, Strings.arrayToCommaDelimitedString(context.getSource()));
|
||||
}
|
||||
} catch (IndexNotFoundException e) {
|
||||
context.addValidationError(e.getMessage());
|
||||
}
|
||||
listener.onResponse(context);
|
||||
}
|
||||
}
|
||||
|
||||
static class RemoteSourceEnabledAndRemoteLicenseValidation implements SourceDestValidation {
|
||||
@Override
|
||||
public void validate(Context context, ActionListener<Context> listener) {
|
||||
if (context.resolveRemoteSource().isEmpty()) {
|
||||
listener.onResponse(context);
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> remoteIndices = new ArrayList<>(context.resolveRemoteSource());
|
||||
// we can only check this node at the moment, clusters with mixed CCS enabled/disabled nodes are not supported,
|
||||
// see gh#50033
|
||||
if (context.isRemoteSearchEnabled() == false) {
|
||||
context.addValidationError(NEEDS_REMOTE_CLUSTER_SEARCH, context.resolveRemoteSource(), context.getNodeName());
|
||||
listener.onResponse(context);
|
||||
return;
|
||||
}
|
||||
|
||||
// this can throw
|
||||
List<String> remoteAliases;
|
||||
try {
|
||||
remoteAliases = RemoteClusterLicenseChecker.remoteClusterAliases(context.getRegisteredRemoteClusterNames(), remoteIndices);
|
||||
} catch (NoSuchRemoteClusterException e) {
|
||||
context.addValidationError(e.getMessage());
|
||||
listener.onResponse(context);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
context.addValidationError(ERROR_REMOTE_CLUSTER_SEARCH, e.getMessage());
|
||||
listener.onResponse(context);
|
||||
return;
|
||||
}
|
||||
|
||||
context.getRemoteClusterLicenseChecker().checkRemoteClusterLicenses(remoteAliases, ActionListener.wrap(response -> {
|
||||
if (response.isSuccess() == false) {
|
||||
if (response.remoteClusterLicenseInfo().licenseInfo().getStatus() != LicenseStatus.ACTIVE) {
|
||||
context.addValidationError(REMOTE_CLUSTER_LICENSE_INACTIVE, response.remoteClusterLicenseInfo().clusterAlias());
|
||||
} else {
|
||||
context.addValidationError(
|
||||
FEATURE_NOT_LICENSED_REMOTE_CLUSTER_LICENSE,
|
||||
response.remoteClusterLicenseInfo().clusterAlias(),
|
||||
context.getLicense(),
|
||||
response.remoteClusterLicenseInfo().licenseInfo().getType()
|
||||
);
|
||||
}
|
||||
}
|
||||
listener.onResponse(context);
|
||||
}, e -> {
|
||||
context.addValidationError(UNKNOWN_REMOTE_CLUSTER_LICENSE, context.getLicense(), remoteAliases, e.getMessage());
|
||||
listener.onResponse(context);
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
static class DestinationInSourceValidation implements SourceDestValidation {
|
||||
|
||||
@Override
|
||||
public void validate(Context context, ActionListener<Context> listener) {
|
||||
final String destIndex = context.getDest();
|
||||
boolean foundSourceInDest = false;
|
||||
|
||||
for (String src : context.getSource()) {
|
||||
if (Regex.simpleMatch(src, destIndex)) {
|
||||
context.addValidationError(DEST_IN_SOURCE, destIndex, src);
|
||||
// do not return immediately but collect all errors and than return
|
||||
foundSourceInDest = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (foundSourceInDest) {
|
||||
listener.onResponse(context);
|
||||
return;
|
||||
}
|
||||
|
||||
if (context.resolvedSource.contains(destIndex)) {
|
||||
context.addValidationError(DEST_IN_SOURCE, destIndex, Strings.arrayToCommaDelimitedString(context.getSource()));
|
||||
listener.onResponse(context);
|
||||
return;
|
||||
}
|
||||
|
||||
if (context.resolvedSource.contains(context.resolveDest())) {
|
||||
context.addValidationError(
|
||||
DEST_IN_SOURCE,
|
||||
context.resolveDest(),
|
||||
Strings.collectionToCommaDelimitedString(context.resolveSource())
|
||||
);
|
||||
}
|
||||
|
||||
listener.onResponse(context);
|
||||
}
|
||||
}
|
||||
|
||||
static class DestinationSingleIndexValidation implements SourceDestValidation {
|
||||
|
||||
@Override
|
||||
public void validate(Context context, ActionListener<Context> listener) {
|
||||
context.resolveDest();
|
||||
listener.onResponse(context);
|
||||
}
|
||||
}
|
||||
|
||||
private static String getMessage(String message, Object... args) {
|
||||
return new MessageFormat(message, Locale.ROOT).format(args);
|
||||
}
|
||||
}
|
|
@ -21,9 +21,6 @@ public class TransformMessages {
|
|||
"Failed to validate configuration";
|
||||
public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration";
|
||||
public static final String REST_PUT_TRANSFORM_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
|
||||
public static final String REST_PUT_TRANSFORM_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
|
||||
public static final String REST_PUT_TRANSFORM_DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]";
|
||||
public static final String REST_PUT_TRANSFORM_DEST_SINGLE_INDEX = "Destination index [{0}] should refer to a single index";
|
||||
public static final String REST_PUT_TRANSFORM_INCONSISTENT_ID =
|
||||
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";
|
||||
public static final String TRANSFORM_CONFIG_INVALID = "Transform configuration is invalid [{0}]";
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
|
@ -67,7 +68,7 @@ public class PreviewTransformAction extends ActionType<PreviewTransformAction.Re
|
|||
Object providedDestination = content.get(TransformField.DESTINATION.getPreferredName());
|
||||
if (providedDestination instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, String> destMap = (Map<String, String>)providedDestination;
|
||||
Map<String, String> destMap = (Map<String, String>) providedDestination;
|
||||
String pipeline = destMap.get(DestConfig.PIPELINE.getPreferredName());
|
||||
if (pipeline != null) {
|
||||
tempDestination.put(DestConfig.PIPELINE.getPreferredName(), pipeline);
|
||||
|
@ -75,12 +76,15 @@ public class PreviewTransformAction extends ActionType<PreviewTransformAction.Re
|
|||
}
|
||||
content.put(TransformField.DESTINATION.getPreferredName(), tempDestination);
|
||||
content.put(TransformField.ID.getPreferredName(), "transform-preview");
|
||||
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
|
||||
XContentParser newParser = XContentType.JSON
|
||||
.xContent()
|
||||
.createParser(parser.getXContentRegistry(),
|
||||
try (
|
||||
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content);
|
||||
XContentParser newParser = XContentType.JSON.xContent()
|
||||
.createParser(
|
||||
parser.getXContentRegistry(),
|
||||
LoggingDeprecationHandler.INSTANCE,
|
||||
BytesReference.bytes(xContentBuilder).streamInput())) {
|
||||
BytesReference.bytes(xContentBuilder).streamInput()
|
||||
)
|
||||
) {
|
||||
return new Request(TransformConfig.fromXContent(newParser, "transform-preview", false));
|
||||
}
|
||||
}
|
||||
|
@ -88,15 +92,20 @@ public class PreviewTransformAction extends ActionType<PreviewTransformAction.Re
|
|||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if(config.getPivotConfig() != null) {
|
||||
for(String failure : config.getPivotConfig().aggFieldValidation()) {
|
||||
if (config.getPivotConfig() != null) {
|
||||
for (String failure : config.getPivotConfig().aggFieldValidation()) {
|
||||
validationException = addValidationError(failure, validationException);
|
||||
}
|
||||
}
|
||||
|
||||
validationException = SourceDestValidator.validateRequest(
|
||||
validationException,
|
||||
config.getDestination() != null ? config.getDestination().getIndex() : null
|
||||
);
|
||||
|
||||
return validationException;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
return this.config.toXContent(builder, params);
|
||||
|
@ -142,6 +151,7 @@ public class PreviewTransformAction extends ActionType<PreviewTransformAction.Re
|
|||
PARSER.declareObjectArray(Response::setDocs, (p, c) -> p.mapOrdered(), PREVIEW);
|
||||
PARSER.declareObject(Response::setMappings, (p, c) -> p.mapOrdered(), MAPPINGS);
|
||||
}
|
||||
|
||||
public Response() {}
|
||||
|
||||
public Response(StreamInput in) throws IOException {
|
||||
|
|
|
@ -15,18 +15,16 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.utils.TransformStrings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName;
|
||||
|
||||
public class PutTransformAction extends ActionType<AcknowledgedResponse> {
|
||||
|
||||
|
@ -71,46 +69,46 @@ public class PutTransformAction extends ActionType<AcknowledgedResponse> {
|
|||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if(config.getPivotConfig() != null
|
||||
if (config.getPivotConfig() != null
|
||||
&& config.getPivotConfig().getMaxPageSearchSize() != null
|
||||
&& (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) {
|
||||
validationException = addValidationError(
|
||||
"pivot.max_page_search_size [" +
|
||||
config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000",
|
||||
validationException);
|
||||
"pivot.max_page_search_size ["
|
||||
+ config.getPivotConfig().getMaxPageSearchSize()
|
||||
+ "] must be greater than 10 and less than 10,000",
|
||||
validationException
|
||||
);
|
||||
}
|
||||
for(String failure : config.getPivotConfig().aggFieldValidation()) {
|
||||
for (String failure : config.getPivotConfig().aggFieldValidation()) {
|
||||
validationException = addValidationError(failure, validationException);
|
||||
}
|
||||
String destIndex = config.getDestination().getIndex();
|
||||
try {
|
||||
validateIndexOrAliasName(destIndex, InvalidIndexNameException::new);
|
||||
if (!destIndex.toLowerCase(Locale.ROOT).equals(destIndex)) {
|
||||
validationException = addValidationError("dest.index [" + destIndex +"] must be lowercase", validationException);
|
||||
}
|
||||
} catch (InvalidIndexNameException ex) {
|
||||
validationException = addValidationError(ex.getMessage(), validationException);
|
||||
}
|
||||
|
||||
validationException = SourceDestValidator.validateRequest(validationException, config.getDestination().getIndex());
|
||||
|
||||
if (TransformStrings.isValidId(config.getId()) == false) {
|
||||
validationException = addValidationError(
|
||||
TransformMessages.getMessage(TransformMessages.INVALID_ID, TransformField.ID.getPreferredName(), config.getId()),
|
||||
validationException);
|
||||
validationException
|
||||
);
|
||||
}
|
||||
if (TransformStrings.hasValidLengthForId(config.getId()) == false) {
|
||||
validationException = addValidationError(
|
||||
TransformMessages.getMessage(TransformMessages.ID_TOO_LONG, TransformStrings.ID_LENGTH_LIMIT),
|
||||
validationException);
|
||||
validationException
|
||||
);
|
||||
}
|
||||
TimeValue frequency = config.getFrequency();
|
||||
if (frequency != null) {
|
||||
if (frequency.compareTo(MIN_FREQUENCY) < 0) {
|
||||
validationException = addValidationError(
|
||||
"minimum permitted [" + TransformField.FREQUENCY + "] is [" + MIN_FREQUENCY.getStringRep() + "]",
|
||||
validationException);
|
||||
validationException
|
||||
);
|
||||
} else if (frequency.compareTo(MAX_FREQUENCY) > 0) {
|
||||
validationException = addValidationError(
|
||||
"highest permitted [" + TransformField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]",
|
||||
validationException);
|
||||
validationException
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,17 +16,15 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName;
|
||||
|
||||
public class UpdateTransformAction extends ActionType<UpdateTransformAction.Response> {
|
||||
|
||||
|
@ -46,7 +44,7 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
|
|||
private final String id;
|
||||
private final boolean deferValidation;
|
||||
|
||||
public Request(TransformConfigUpdate update, String id, boolean deferValidation) {
|
||||
public Request(TransformConfigUpdate update, String id, boolean deferValidation) {
|
||||
this.update = update;
|
||||
this.id = id;
|
||||
this.deferValidation = deferValidation;
|
||||
|
@ -70,27 +68,24 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
|
|||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
|
||||
if (update.getDestination() != null && update.getDestination().getIndex() != null) {
|
||||
String destIndex = update.getDestination().getIndex();
|
||||
try {
|
||||
validateIndexOrAliasName(destIndex, InvalidIndexNameException::new);
|
||||
if (!destIndex.toLowerCase(Locale.ROOT).equals(destIndex)) {
|
||||
validationException = addValidationError("dest.index [" + destIndex + "] must be lowercase", validationException);
|
||||
}
|
||||
} catch (InvalidIndexNameException ex) {
|
||||
validationException = addValidationError(ex.getMessage(), validationException);
|
||||
}
|
||||
|
||||
validationException = SourceDestValidator.validateRequest(validationException, update.getDestination().getIndex());
|
||||
}
|
||||
|
||||
TimeValue frequency = update.getFrequency();
|
||||
if (frequency != null) {
|
||||
if (frequency.compareTo(MIN_FREQUENCY) < 0) {
|
||||
validationException = addValidationError(
|
||||
"minimum permitted [" + TransformField.FREQUENCY + "] is [" + MIN_FREQUENCY.getStringRep() + "]",
|
||||
validationException);
|
||||
validationException
|
||||
);
|
||||
} else if (frequency.compareTo(MAX_FREQUENCY) > 0) {
|
||||
validationException = addValidationError(
|
||||
"highest permitted [" + TransformField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]",
|
||||
validationException);
|
||||
validationException
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -131,9 +126,7 @@ public class UpdateTransformAction extends ActionType<UpdateTransformAction.Resp
|
|||
return false;
|
||||
}
|
||||
Request other = (Request) obj;
|
||||
return Objects.equals(update, other.update) &&
|
||||
this.deferValidation == other.deferValidation &&
|
||||
this.id.equals(other.id);
|
||||
return Objects.equals(update, other.update) && this.deferValidation == other.deferValidation && this.id.equals(other.id);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,822 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.common.validation;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.License;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
|
||||
import org.elasticsearch.protocol.xpack.XPackInfoResponse.LicenseInfo;
|
||||
import org.elasticsearch.protocol.xpack.license.LicenseStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.Context;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.RemoteSourceEnabledAndRemoteLicenseValidation;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
|
||||
import static org.elasticsearch.mock.orig.Mockito.when;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
public class SourceDestValidatorTests extends ESTestCase {
|
||||
|
||||
private static final String SOURCE_1 = "source-1";
|
||||
private static final String SOURCE_1_ALIAS = "source-1-alias";
|
||||
private static final String SOURCE_2 = "source-2";
|
||||
private static final String DEST_ALIAS = "dest-alias";
|
||||
private static final String ALIASED_DEST = "aliased-dest";
|
||||
private static final String ALIAS_READ_WRITE_DEST = "alias-read-write-dest";
|
||||
private static final String REMOTE_BASIC = "remote-basic";
|
||||
private static final String REMOTE_PLATINUM = "remote-platinum";
|
||||
|
||||
private static final ClusterState CLUSTER_STATE;
|
||||
private Client clientWithBasicLicense;
|
||||
private Client clientWithExpiredBasicLicense;
|
||||
private Client clientWithPlatinumLicense;
|
||||
private Client clientWithTrialLicense;
|
||||
private RemoteClusterLicenseChecker remoteClusterLicenseCheckerBasic;
|
||||
|
||||
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
private final TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool);
|
||||
private final RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
|
||||
private final SourceDestValidator simpleNonRemoteValidator = new SourceDestValidator(
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
null,
|
||||
"node_id",
|
||||
"license"
|
||||
);
|
||||
|
||||
static {
|
||||
IndexMetaData source1 = IndexMetaData.builder(SOURCE_1)
|
||||
.settings(
|
||||
Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
|
||||
)
|
||||
.putAlias(AliasMetaData.builder(SOURCE_1_ALIAS).build())
|
||||
.putAlias(AliasMetaData.builder(ALIAS_READ_WRITE_DEST).writeIndex(false).build())
|
||||
.build();
|
||||
IndexMetaData source2 = IndexMetaData.builder(SOURCE_2)
|
||||
.settings(
|
||||
Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
|
||||
)
|
||||
.putAlias(AliasMetaData.builder(DEST_ALIAS).build())
|
||||
.putAlias(AliasMetaData.builder(ALIAS_READ_WRITE_DEST).writeIndex(false).build())
|
||||
.build();
|
||||
IndexMetaData aliasedDest = IndexMetaData.builder(ALIASED_DEST)
|
||||
.settings(
|
||||
Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
|
||||
)
|
||||
.putAlias(AliasMetaData.builder(DEST_ALIAS).build())
|
||||
.putAlias(AliasMetaData.builder(ALIAS_READ_WRITE_DEST).build())
|
||||
.build();
|
||||
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
|
||||
state.metaData(
|
||||
MetaData.builder()
|
||||
.put(IndexMetaData.builder(source1).build(), false)
|
||||
.put(IndexMetaData.builder(source2).build(), false)
|
||||
.put(IndexMetaData.builder(aliasedDest).build(), false)
|
||||
);
|
||||
CLUSTER_STATE = state.build();
|
||||
}
|
||||
|
||||
private class MockClientLicenseCheck extends NoOpClient {
|
||||
private final String license;
|
||||
private final LicenseStatus licenseStatus;
|
||||
|
||||
MockClientLicenseCheck(String testName, String license, LicenseStatus licenseStatus) {
|
||||
super(testName);
|
||||
this.license = license;
|
||||
this.licenseStatus = licenseStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Client getRemoteClusterClient(String clusterAlias) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
|
||||
ActionType<Response> action,
|
||||
Request request,
|
||||
ActionListener<Response> listener
|
||||
) {
|
||||
if (request instanceof XPackInfoRequest) {
|
||||
XPackInfoResponse response = new XPackInfoResponse(
|
||||
null,
|
||||
new LicenseInfo("uid", license, license, licenseStatus, randomNonNegativeLong()),
|
||||
null
|
||||
);
|
||||
listener.onResponse((Response) response);
|
||||
return;
|
||||
}
|
||||
super.doExecute(action, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupComponents() {
|
||||
clientWithBasicLicense = new MockClientLicenseCheck(getTestName(), "basic", LicenseStatus.ACTIVE);
|
||||
clientWithExpiredBasicLicense = new MockClientLicenseCheck(getTestName(), "basic", LicenseStatus.EXPIRED);
|
||||
remoteClusterLicenseCheckerBasic = new RemoteClusterLicenseChecker(
|
||||
clientWithBasicLicense,
|
||||
(operationMode -> operationMode != License.OperationMode.MISSING)
|
||||
);
|
||||
clientWithPlatinumLicense = new MockClientLicenseCheck(getTestName(), "platinum", LicenseStatus.ACTIVE);
|
||||
clientWithTrialLicense = new MockClientLicenseCheck(getTestName(), "trial", LicenseStatus.ACTIVE);
|
||||
}
|
||||
|
||||
@After
|
||||
public void closeComponents() throws Exception {
|
||||
clientWithBasicLicense.close();
|
||||
clientWithExpiredBasicLicense.close();
|
||||
clientWithPlatinumLicense.close();
|
||||
clientWithTrialLicense.close();
|
||||
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
"dest",
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenNoSourceIndexAndValidDestIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] {},
|
||||
"dest",
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(e.validationErrors().get(0), equalTo("Source index [] does not exist"));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenMissingConcreteSourceIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "missing" },
|
||||
"dest",
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(e.validationErrors().get(0), equalTo("no such index [missing]"));
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "missing" },
|
||||
"dest",
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenMixedMissingAndExistingConcreteSourceIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1, "missing" },
|
||||
"dest",
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(e.validationErrors().get(0), equalTo("no such index [missing]"));
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1, "missing" },
|
||||
"dest",
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenMixedMissingWildcardExistingConcreteSourceIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1, "wildcard*", "missing" },
|
||||
"dest",
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(e.validationErrors().get(0), equalTo("no such index [missing]"));
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1, "wildcard*", "missing" },
|
||||
"dest",
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenWildcardSourceIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "wildcard*" },
|
||||
"dest",
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexSameAsSourceIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
SOURCE_1,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo("Destination index [" + SOURCE_1 + "] is included in source expression [" + SOURCE_1 + "]")
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
SOURCE_1,
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexMatchesSourceIndex() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "source-*" },
|
||||
SOURCE_2,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [source-*]")
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "source-*" },
|
||||
SOURCE_2,
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "source-1", "source-*" },
|
||||
SOURCE_2,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [source-*]")
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "source-1", "source-*" },
|
||||
SOURCE_2,
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexMatchesMultipleSourceIndices() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { "source-1", "source-*", "sou*" },
|
||||
SOURCE_2,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(2, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [source-*]")
|
||||
);
|
||||
assertThat(
|
||||
e.validationErrors().get(1),
|
||||
equalTo("Destination index [" + SOURCE_2 + "] is included in source expression [sou*]")
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
DEST_ALIAS,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo(
|
||||
"no write index is defined for alias [dest-alias]. "
|
||||
+ "The write index may be explicitly disabled using is_write_index=false or the alias points "
|
||||
+ "to multiple indices without one being designated as a write index"
|
||||
)
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
DEST_ALIAS,
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo(
|
||||
"no write index is defined for alias [dest-alias]. "
|
||||
+ "The write index may be explicitly disabled using is_write_index=false or the alias points "
|
||||
+ "to multiple indices without one being designated as a write index"
|
||||
)
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndicesButHasSingleWriteAlias() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
ALIAS_READ_WRITE_DEST,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo(
|
||||
"no write index is defined for alias ["
|
||||
+ ALIAS_READ_WRITE_DEST
|
||||
+ "]. "
|
||||
+ "The write index may be explicitly disabled using is_write_index=false or the alias points "
|
||||
+ "to multiple indices without one being designated as a write index"
|
||||
)
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
SOURCE_1_ALIAS,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(1, e.validationErrors().size());
|
||||
assertThat(
|
||||
e.validationErrors().get(0),
|
||||
equalTo("Destination index [" + SOURCE_1 + "] is included in source expression [" + SOURCE_1 + "]")
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1 },
|
||||
SOURCE_1_ALIAS,
|
||||
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
true,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheck_MultipleValidationErrors() throws InterruptedException {
|
||||
assertValidation(
|
||||
listener -> simpleNonRemoteValidator.validate(
|
||||
CLUSTER_STATE,
|
||||
new String[] { SOURCE_1, "missing" },
|
||||
SOURCE_1_ALIAS,
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
listener
|
||||
),
|
||||
(Boolean) null,
|
||||
e -> {
|
||||
assertEquals(2, e.validationErrors().size());
|
||||
assertThat(e.validationErrors().get(0), equalTo("no such index [missing]"));
|
||||
assertThat(
|
||||
e.validationErrors().get(1),
|
||||
equalTo("Destination index [" + SOURCE_1 + "] is included in source expression [missing,source-1]")
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// CCS tests: at time of writing it wasn't possible to mock RemoteClusterService, therefore it's not possible
|
||||
// to test the whole validation but test RemoteSourceEnabledAndRemoteLicenseValidation
|
||||
public void testRemoteSourceBasic() throws InterruptedException {
|
||||
Context context = spy(
|
||||
new SourceDestValidator.Context(
|
||||
CLUSTER_STATE,
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
remoteClusterLicenseCheckerBasic,
|
||||
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
|
||||
"dest",
|
||||
"node_id",
|
||||
"license"
|
||||
)
|
||||
);
|
||||
|
||||
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
|
||||
RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
|
||||
|
||||
assertValidationWithContext(
|
||||
listener -> validator.validate(context, listener),
|
||||
c -> { assertNull(c.getValidationException()); },
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testRemoteSourcePlatinum() throws InterruptedException {
|
||||
final Context context = spy(
|
||||
new SourceDestValidator.Context(
|
||||
CLUSTER_STATE,
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
new RemoteClusterLicenseChecker(clientWithBasicLicense, XPackLicenseState::isPlatinumOrTrialOperationMode),
|
||||
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
|
||||
"dest",
|
||||
"node_id",
|
||||
"platinum"
|
||||
)
|
||||
);
|
||||
|
||||
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
|
||||
final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
|
||||
|
||||
assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
|
||||
assertNotNull(c.getValidationException());
|
||||
assertEquals(1, c.getValidationException().validationErrors().size());
|
||||
assertThat(
|
||||
c.getValidationException().validationErrors().get(0),
|
||||
equalTo(
|
||||
"License check failed for remote cluster alias ["
|
||||
+ REMOTE_BASIC
|
||||
+ "], at least a [platinum] license is required, found license [basic]"
|
||||
)
|
||||
);
|
||||
}, null);
|
||||
|
||||
final Context context2 = spy(
|
||||
new SourceDestValidator.Context(
|
||||
CLUSTER_STATE,
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
new RemoteClusterLicenseChecker(clientWithPlatinumLicense, XPackLicenseState::isPlatinumOrTrialOperationMode),
|
||||
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
|
||||
"dest",
|
||||
"node_id",
|
||||
"license"
|
||||
)
|
||||
);
|
||||
when(context2.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM));
|
||||
|
||||
assertValidationWithContext(
|
||||
listener -> validator.validate(context2, listener),
|
||||
c -> { assertNull(c.getValidationException()); },
|
||||
null
|
||||
);
|
||||
|
||||
final Context context3 = spy(
|
||||
new SourceDestValidator.Context(
|
||||
CLUSTER_STATE,
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
new RemoteClusterLicenseChecker(clientWithPlatinumLicense, XPackLicenseState::isPlatinumOrTrialOperationMode),
|
||||
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
|
||||
"dest",
|
||||
"node_id",
|
||||
"platinum"
|
||||
)
|
||||
);
|
||||
when(context3.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM));
|
||||
|
||||
final RemoteSourceEnabledAndRemoteLicenseValidation validator3 = new RemoteSourceEnabledAndRemoteLicenseValidation();
|
||||
assertValidationWithContext(
|
||||
listener -> validator3.validate(context3, listener),
|
||||
c -> { assertNull(c.getValidationException()); },
|
||||
null
|
||||
);
|
||||
|
||||
final Context context4 = spy(
|
||||
new SourceDestValidator.Context(
|
||||
CLUSTER_STATE,
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
new RemoteClusterLicenseChecker(clientWithTrialLicense, XPackLicenseState::isPlatinumOrTrialOperationMode),
|
||||
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
|
||||
"dest",
|
||||
"node_id",
|
||||
"trial"
|
||||
)
|
||||
);
|
||||
when(context4.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM));
|
||||
|
||||
final RemoteSourceEnabledAndRemoteLicenseValidation validator4 = new RemoteSourceEnabledAndRemoteLicenseValidation();
|
||||
assertValidationWithContext(
|
||||
listener -> validator4.validate(context4, listener),
|
||||
c -> { assertNull(c.getValidationException()); },
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public void testRemoteSourceLicenseInActive() throws InterruptedException {
|
||||
final Context context = spy(
|
||||
new SourceDestValidator.Context(
|
||||
CLUSTER_STATE,
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, XPackLicenseState::isPlatinumOrTrialOperationMode),
|
||||
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
|
||||
"dest",
|
||||
"node_id",
|
||||
"license"
|
||||
)
|
||||
);
|
||||
|
||||
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
|
||||
final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
|
||||
assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
|
||||
assertNotNull(c.getValidationException());
|
||||
assertEquals(1, c.getValidationException().validationErrors().size());
|
||||
assertThat(
|
||||
c.getValidationException().validationErrors().get(0),
|
||||
equalTo("License check failed for remote cluster alias [" + REMOTE_BASIC + "], license is not active")
|
||||
);
|
||||
}, null);
|
||||
}
|
||||
|
||||
public void testRemoteSourceDoesNotExist() throws InterruptedException {
|
||||
Context context = spy(
|
||||
new SourceDestValidator.Context(
|
||||
CLUSTER_STATE,
|
||||
new IndexNameExpressionResolver(),
|
||||
remoteClusterService,
|
||||
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense, XPackLicenseState::isPlatinumOrTrialOperationMode),
|
||||
new String[] { "non_existing_remote:" + "SOURCE_1" },
|
||||
"dest",
|
||||
"node_id",
|
||||
"license"
|
||||
)
|
||||
);
|
||||
|
||||
when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
|
||||
RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
|
||||
|
||||
assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
|
||||
assertNotNull(c.getValidationException());
|
||||
assertEquals(1, c.getValidationException().validationErrors().size());
|
||||
assertThat(c.getValidationException().validationErrors().get(0), equalTo("no such remote cluster: [non_existing_remote]"));
|
||||
}, null);
|
||||
}
|
||||
|
||||
public void testRequestValidation() {
|
||||
ActionRequestValidationException validationException = SourceDestValidator.validateRequest(null, "UPPERCASE");
|
||||
assertNotNull(validationException);
|
||||
assertEquals(1, validationException.validationErrors().size());
|
||||
assertThat(validationException.validationErrors().get(0), equalTo("Destination index [UPPERCASE] must be lowercase"));
|
||||
|
||||
validationException = SourceDestValidator.validateRequest(null, "remote:dest");
|
||||
assertNotNull(validationException);
|
||||
assertEquals(1, validationException.validationErrors().size());
|
||||
assertThat(validationException.validationErrors().get(0), equalTo("Invalid index name [remote:dest], must not contain ':'"));
|
||||
|
||||
validationException = SourceDestValidator.validateRequest(null, "dest");
|
||||
assertNull(validationException);
|
||||
|
||||
validationException = new ActionRequestValidationException();
|
||||
validationException.addValidationError("error1");
|
||||
validationException.addValidationError("error2");
|
||||
validationException = SourceDestValidator.validateRequest(validationException, "dest");
|
||||
assertNotNull(validationException);
|
||||
assertEquals(2, validationException.validationErrors().size());
|
||||
assertEquals(validationException.validationErrors().get(0), "error1");
|
||||
assertEquals(validationException.validationErrors().get(1), "error2");
|
||||
|
||||
validationException = SourceDestValidator.validateRequest(validationException, "UPPERCASE");
|
||||
assertNotNull(validationException);
|
||||
assertEquals(3, validationException.validationErrors().size());
|
||||
assertThat(validationException.validationErrors().get(2), equalTo("Destination index [UPPERCASE] must be lowercase"));
|
||||
}
|
||||
|
||||
private <T> void assertValidation(Consumer<ActionListener<T>> function, T expected, Consumer<ValidationException> onException)
|
||||
throws InterruptedException {
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
||||
|
||||
LatchedActionListener<T> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
|
||||
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
|
||||
if (expected == null) {
|
||||
fail("expected an exception but got a response");
|
||||
} else {
|
||||
assertThat(r, equalTo(expected));
|
||||
}
|
||||
}, e -> {
|
||||
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
|
||||
if (onException == null) {
|
||||
logger.error("got unexpected exception", e);
|
||||
fail("got unexpected exception: " + e.getMessage());
|
||||
} else if (e instanceof ValidationException) {
|
||||
onException.accept((ValidationException) e);
|
||||
} else {
|
||||
fail("got unexpected exception type: " + e);
|
||||
}
|
||||
}), latch);
|
||||
|
||||
function.accept(listener);
|
||||
assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private void assertValidationWithContext(
|
||||
Consumer<ActionListener<Context>> function,
|
||||
CheckedConsumer<Context, ? extends Exception> onAnswer,
|
||||
Consumer<ValidationException> onException
|
||||
) throws InterruptedException {
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicBoolean listenerCalled = new AtomicBoolean(false);
|
||||
|
||||
LatchedActionListener<Context> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
|
||||
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
|
||||
if (onAnswer == null) {
|
||||
fail("expected an exception but got a response");
|
||||
} else {
|
||||
onAnswer.accept(r);
|
||||
}
|
||||
}, e -> {
|
||||
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
|
||||
if (onException == null) {
|
||||
logger.error("got unexpected exception", e);
|
||||
fail("got unexpected exception: " + e.getMessage());
|
||||
} else if (e instanceof ValidationException) {
|
||||
onException.accept((ValidationException) e);
|
||||
} else {
|
||||
fail("got unexpected exception type: " + e);
|
||||
}
|
||||
}), latch);
|
||||
|
||||
function.accept(listener);
|
||||
assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
|
@ -166,7 +166,7 @@ setup:
|
|||
---
|
||||
"Test preview with non-existing source index":
|
||||
- do:
|
||||
catch: /Source index \[does_not_exist\] does not exist/
|
||||
catch: /.*reason=Validation Failed.* no such index \[does_not_exist\]/
|
||||
transform.preview_transform:
|
||||
body: >
|
||||
{
|
||||
|
|
|
@ -79,7 +79,7 @@ setup:
|
|||
---
|
||||
"Test put transform with invalid source index":
|
||||
- do:
|
||||
catch: /Source index \[missing-index\] does not exist/
|
||||
catch: /.*reason=Validation Failed.* no such index \[missing-index\]/
|
||||
transform.put_transform:
|
||||
transform_id: "missing-source-transform"
|
||||
body: >
|
||||
|
@ -384,7 +384,7 @@ setup:
|
|||
name: source-index
|
||||
|
||||
- do:
|
||||
catch: /Destination index \[created-destination-index\] is included in source expression \[airline-data,created-destination-index\]/
|
||||
catch: /.*reason=Validation Failed.* Destination index \[created-destination-index\] is included in source expression \[airline-data,created-destination-index\]/
|
||||
transform.put_transform:
|
||||
transform_id: "transform-from-aliases-failures"
|
||||
body: >
|
||||
|
@ -410,7 +410,7 @@ setup:
|
|||
name: dest-index
|
||||
|
||||
- do:
|
||||
catch: /Destination index \[dest-index\] should refer to a single index/
|
||||
catch: /.*reason=Validation Failed.* no write index is defined for alias [dest2-index].*/
|
||||
transform.put_transform:
|
||||
transform_id: "airline-transform"
|
||||
body: >
|
||||
|
@ -521,7 +521,7 @@ setup:
|
|||
---
|
||||
"Test invalid destination index name":
|
||||
- do:
|
||||
catch: /dest\.index \[DeStInAtIoN\] must be lowercase/
|
||||
catch: /.*reason=Validation Failed.* Destination index \[DeStInAtIoN\] must be lowercase/
|
||||
transform.put_transform:
|
||||
transform_id: "airline-transform"
|
||||
body: >
|
||||
|
|
|
@ -67,7 +67,7 @@ setup:
|
|||
---
|
||||
"Test put transform with invalid source index":
|
||||
- do:
|
||||
catch: /Source index \[missing-index\] does not exist/
|
||||
catch: /.*reason=Validation Failed.* no such index \[missing-index\]/
|
||||
transform.update_transform:
|
||||
transform_id: "updating-airline-transform"
|
||||
body: >
|
||||
|
@ -255,7 +255,7 @@ setup:
|
|||
name: source2-index
|
||||
|
||||
- do:
|
||||
catch: /Destination index \[created-destination-index\] is included in source expression \[created-destination-index\]/
|
||||
catch: /.*reason=Validation Failed.* Destination index \[created-destination-index\] is included in source expression \[created-destination-index\]/
|
||||
transform.update_transform:
|
||||
transform_id: "updating-airline-transform"
|
||||
body: >
|
||||
|
@ -280,7 +280,7 @@ setup:
|
|||
index: created-destination-index
|
||||
name: dest2-index
|
||||
- do:
|
||||
catch: /Destination index \[dest2-index\] should refer to a single index/
|
||||
catch: /.*reason=Validation Failed.* no write index is defined for alias [dest2-index].*/
|
||||
transform.update_transform:
|
||||
transform_id: "updating-airline-transform"
|
||||
body: >
|
||||
|
@ -290,7 +290,7 @@ setup:
|
|||
---
|
||||
"Test invalid destination index name":
|
||||
- do:
|
||||
catch: /dest\.index \[DeStInAtIoN\] must be lowercase/
|
||||
catch: /.*reason=Validation Failed.* Destination index \[DeStInAtIoN\] must be lowercase/
|
||||
transform.update_transform:
|
||||
transform_id: "updating-airline-transform"
|
||||
body: >
|
||||
|
@ -298,7 +298,7 @@ setup:
|
|||
"dest": { "index": "DeStInAtIoN" }
|
||||
}
|
||||
- do:
|
||||
catch: /Invalid index name \[destination#dest\], must not contain \'#\'/
|
||||
catch: /.*reason=Validation Failed.* Invalid index name \[destination#dest\], must not contain \'#\'/
|
||||
transform.update_transform:
|
||||
transform_id: "updating-airline-transform"
|
||||
body: >
|
||||
|
|
|
@ -17,29 +17,33 @@ import org.elasticsearch.action.ingest.SimulatePipelineResponse;
|
|||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.license.License;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
|
||||
|
@ -66,8 +70,8 @@ public class TransportPreviewTransformAction extends
|
|||
private final XPackLicenseState licenseState;
|
||||
private final Client client;
|
||||
private final ThreadPool threadPool;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private final ClusterService clusterService;
|
||||
private final SourceDestValidator sourceDestValidator;
|
||||
|
||||
@Inject
|
||||
public TransportPreviewTransformAction(
|
||||
|
@ -77,7 +81,8 @@ public class TransportPreviewTransformAction extends
|
|||
ThreadPool threadPool,
|
||||
XPackLicenseState licenseState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService
|
||||
ClusterService clusterService,
|
||||
Settings settings
|
||||
) {
|
||||
this(
|
||||
PreviewTransformAction.NAME,
|
||||
|
@ -87,7 +92,8 @@ public class TransportPreviewTransformAction extends
|
|||
threadPool,
|
||||
licenseState,
|
||||
indexNameExpressionResolver,
|
||||
clusterService
|
||||
clusterService,
|
||||
settings
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -99,14 +105,23 @@ public class TransportPreviewTransformAction extends
|
|||
ThreadPool threadPool,
|
||||
XPackLicenseState licenseState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService
|
||||
ClusterService clusterService,
|
||||
Settings settings
|
||||
) {
|
||||
super(name, transportService, actionFilters, PreviewTransformAction.Request::new);
|
||||
this.licenseState = licenseState;
|
||||
this.client = client;
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
License.OperationMode.BASIC.description()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,39 +134,41 @@ public class TransportPreviewTransformAction extends
|
|||
ClusterState clusterState = clusterService.state();
|
||||
|
||||
final TransformConfig config = request.getConfig();
|
||||
for (String src : config.getSource().getIndex()) {
|
||||
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
|
||||
if (concreteNames.length == 0) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_SOURCE_INDEX_MISSING, src),
|
||||
RestStatus.BAD_REQUEST
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Pivot pivot = new Pivot(config.getPivotConfig());
|
||||
try {
|
||||
pivot.validateConfig();
|
||||
} catch (ElasticsearchStatusException e) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException(TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, e.status(), e)
|
||||
);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException(
|
||||
TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION,
|
||||
RestStatus.INTERNAL_SERVER_ERROR,
|
||||
e
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
sourceDestValidator.validate(
|
||||
clusterState,
|
||||
config.getSource().getIndex(),
|
||||
config.getDestination().getIndex(),
|
||||
SourceDestValidator.PREVIEW_VALIDATIONS,
|
||||
ActionListener.wrap(r -> {
|
||||
|
||||
getPreview(pivot, config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), listener);
|
||||
Pivot pivot = new Pivot(config.getPivotConfig());
|
||||
try {
|
||||
pivot.validateConfig();
|
||||
} catch (ElasticsearchStatusException e) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException(
|
||||
TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION,
|
||||
e.status(),
|
||||
e
|
||||
)
|
||||
);
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException(
|
||||
TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION,
|
||||
RestStatus.INTERNAL_SERVER_ERROR,
|
||||
e
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
getPreview(pivot, config.getSource(), config.getDestination().getPipeline(), config.getDestination().getIndex(), listener);
|
||||
|
||||
}, listener::onFailure)
|
||||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -216,10 +233,7 @@ public class TransportPreviewTransformAction extends
|
|||
builder.startObject();
|
||||
builder.field("docs", results);
|
||||
builder.endObject();
|
||||
SimulatePipelineRequest pipelineRequest = new SimulatePipelineRequest(
|
||||
BytesReference.bytes(builder),
|
||||
XContentType.JSON
|
||||
);
|
||||
SimulatePipelineRequest pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
|
||||
pipelineRequest.setId(pipeline);
|
||||
ClientHelper.executeAsyncWithOrigin(
|
||||
client,
|
||||
|
|
|
@ -26,16 +26,20 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.License;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.core.security.SecurityContext;
|
||||
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
|
||||
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
|
||||
|
@ -50,7 +54,6 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
|||
import org.elasticsearch.xpack.transform.TransformServices;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||
import org.elasticsearch.xpack.transform.transforms.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -69,6 +72,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
|
|||
private final TransformConfigManager transformConfigManager;
|
||||
private final SecurityContext securityContext;
|
||||
private final TransformAuditor auditor;
|
||||
private final SourceDestValidator sourceDestValidator;
|
||||
|
||||
@Inject
|
||||
public TransportPutTransformAction(
|
||||
|
@ -124,6 +128,15 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
|
|||
? new SecurityContext(settings, threadPool.getThreadContext())
|
||||
: null;
|
||||
this.auditor = transformServices.getAuditor();
|
||||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
License.OperationMode.BASIC.description()
|
||||
);
|
||||
}
|
||||
|
||||
static HasPrivilegesRequest buildPrivilegeCheck(
|
||||
|
@ -208,26 +221,31 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
|
|||
);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation());
|
||||
} catch (ElasticsearchStatusException ex) {
|
||||
listener.onFailure(ex);
|
||||
return;
|
||||
}
|
||||
|
||||
// Early check to verify that the user can create the destination index and can read from the source
|
||||
if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) {
|
||||
final String username = securityContext.getUser().principal();
|
||||
HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username);
|
||||
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
|
||||
r -> handlePrivsResponse(username, request, r, listener),
|
||||
sourceDestValidator.validate(
|
||||
clusterState,
|
||||
config.getSource().getIndex(),
|
||||
config.getDestination().getIndex(),
|
||||
request.isDeferValidation() ? SourceDestValidator.NON_DEFERABLE_VALIDATIONS : SourceDestValidator.ALL_VALIDATIONS,
|
||||
ActionListener.wrap(
|
||||
validationResponse -> {
|
||||
// Early check to verify that the user can create the destination index and can read from the source
|
||||
if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) {
|
||||
final String username = securityContext.getUser().principal();
|
||||
HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username);
|
||||
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
|
||||
r -> handlePrivsResponse(username, request, r, listener),
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
} else { // No security enabled, just create the transform
|
||||
putTransform(request, listener);
|
||||
}
|
||||
},
|
||||
listener::onFailure
|
||||
);
|
||||
|
||||
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
|
||||
} else { // No security enabled, just create the transform
|
||||
putTransform(request, listener);
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,16 +24,21 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.license.License;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
|
@ -44,7 +49,6 @@ import org.elasticsearch.xpack.transform.TransformServices;
|
|||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
|
||||
import org.elasticsearch.xpack.transform.transforms.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -65,6 +69,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
private final PersistentTasksService persistentTasksService;
|
||||
private final Client client;
|
||||
private final TransformAuditor auditor;
|
||||
private final SourceDestValidator sourceDestValidator;
|
||||
|
||||
@Inject
|
||||
public TransportStartTransformAction(
|
||||
|
@ -76,7 +81,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
TransformServices transformServices,
|
||||
PersistentTasksService persistentTasksService,
|
||||
Client client
|
||||
Client client,
|
||||
Settings settings
|
||||
) {
|
||||
this(
|
||||
StartTransformAction.NAME,
|
||||
|
@ -88,7 +94,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
indexNameExpressionResolver,
|
||||
transformServices,
|
||||
persistentTasksService,
|
||||
client
|
||||
client,
|
||||
settings
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -102,7 +109,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
TransformServices transformServices,
|
||||
PersistentTasksService persistentTasksService,
|
||||
Client client
|
||||
Client client,
|
||||
Settings settings
|
||||
) {
|
||||
super(
|
||||
name,
|
||||
|
@ -118,6 +126,15 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
this.persistentTasksService = persistentTasksService;
|
||||
this.client = client;
|
||||
this.auditor = transformServices.getAuditor();
|
||||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
License.OperationMode.BASIC.description()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -141,8 +158,9 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
return;
|
||||
}
|
||||
final AtomicReference<TransformTaskParams> transformTaskHolder = new AtomicReference<>();
|
||||
final AtomicReference<TransformConfig> transformConfigHolder = new AtomicReference<>();
|
||||
|
||||
// <4> Wait for the allocated task's state to STARTED
|
||||
// <5> Wait for the allocated task's state to STARTED
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<TransformTaskParams>> newPersistentTaskActionListener = ActionListener
|
||||
.wrap(task -> {
|
||||
TransformTaskParams transformTask = transformTaskHolder.get();
|
||||
|
@ -155,7 +173,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
);
|
||||
}, listener::onFailure);
|
||||
|
||||
// <3> Create the task in cluster state so that it will start executing on the node
|
||||
// <4> Create the task in cluster state so that it will start executing on the node
|
||||
ActionListener<Void> createOrGetIndexListener = ActionListener.wrap(unused -> {
|
||||
TransformTaskParams transformTask = transformTaskHolder.get();
|
||||
assert transformTask != null;
|
||||
|
@ -192,26 +210,13 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
}, listener::onFailure);
|
||||
|
||||
// <2> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it
|
||||
ActionListener<TransformConfig> getTransformListener = ActionListener.wrap(config -> {
|
||||
if (config.isValid() == false) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.TRANSFORM_CONFIG_INVALID, request.getId()),
|
||||
RestStatus.BAD_REQUEST
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
// Validate source and destination indices
|
||||
SourceDestValidator.validate(config, clusterService.state(), indexNameExpressionResolver, false);
|
||||
|
||||
transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
|
||||
final String destinationIndex = config.getDestination().getIndex();
|
||||
ActionListener<Boolean> validationListener = ActionListener.wrap(validationResponse -> {
|
||||
final String destinationIndex = transformConfigHolder.get().getDestination().getIndex();
|
||||
String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), destinationIndex);
|
||||
|
||||
if (dest.length == 0) {
|
||||
auditor.info(request.getId(), "Creating destination index [" + destinationIndex + "] with deduced mappings.");
|
||||
createDestinationIndex(config, createOrGetIndexListener);
|
||||
createDestinationIndex(transformConfigHolder.get(), createOrGetIndexListener);
|
||||
} else {
|
||||
auditor.info(request.getId(), "Using existing destination index [" + destinationIndex + "].");
|
||||
ClientHelper.executeAsyncWithOrigin(
|
||||
|
@ -238,6 +243,29 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
}
|
||||
}, listener::onFailure);
|
||||
|
||||
// <2> run transform validations
|
||||
ActionListener<TransformConfig> getTransformListener = ActionListener.wrap(config -> {
|
||||
if (config.isValid() == false) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.TRANSFORM_CONFIG_INVALID, request.getId()),
|
||||
RestStatus.BAD_REQUEST
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
|
||||
transformConfigHolder.set(config);
|
||||
|
||||
sourceDestValidator.validate(
|
||||
clusterService.state(),
|
||||
config.getSource().getIndex(),
|
||||
config.getDestination().getIndex(),
|
||||
SourceDestValidator.ALL_VALIDATIONS,
|
||||
validationListener
|
||||
);
|
||||
}, listener::onFailure);
|
||||
|
||||
// <1> Get the config to verify it exists and is valid
|
||||
transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
|
||||
}
|
||||
|
|
|
@ -23,16 +23,20 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.License;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.RemoteClusterLicenseChecker;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.XPackField;
|
||||
import org.elasticsearch.xpack.core.XPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.core.security.SecurityContext;
|
||||
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
|
||||
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
|
||||
|
@ -50,7 +54,6 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
|||
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
|
||||
import org.elasticsearch.xpack.transform.transforms.SourceDestValidator;
|
||||
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -69,6 +72,7 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
|
|||
private final TransformConfigManager transformConfigManager;
|
||||
private final SecurityContext securityContext;
|
||||
private final TransformAuditor auditor;
|
||||
private final SourceDestValidator sourceDestValidator;
|
||||
|
||||
@Inject
|
||||
public TransportUpdateTransformAction(
|
||||
|
@ -116,6 +120,15 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
|
|||
? new SecurityContext(settings, threadPool.getThreadContext())
|
||||
: null;
|
||||
this.auditor = transformServices.getAuditor();
|
||||
this.sourceDestValidator = new SourceDestValidator(
|
||||
indexNameExpressionResolver,
|
||||
transportService.getRemoteClusterService(),
|
||||
RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)
|
||||
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
|
||||
: null,
|
||||
clusterService.getNodeName(),
|
||||
License.OperationMode.BASIC.description()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -159,7 +172,19 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
|
|||
return;
|
||||
}
|
||||
TransformConfig updatedConfig = update.apply(config);
|
||||
validateAndUpdateTransform(request, clusterState, updatedConfig, configAndVersion.v2(), listener);
|
||||
sourceDestValidator.validate(
|
||||
clusterState,
|
||||
updatedConfig.getSource().getIndex(),
|
||||
updatedConfig.getDestination().getIndex(),
|
||||
request.isDeferValidation() ? SourceDestValidator.NON_DEFERABLE_VALIDATIONS : SourceDestValidator.ALL_VALIDATIONS,
|
||||
ActionListener.wrap(
|
||||
validationResponse -> {
|
||||
checkPriviledgesAndUpdateTransform(request, clusterState, updatedConfig, configAndVersion.v2(), listener);
|
||||
},
|
||||
listener::onFailure
|
||||
)
|
||||
);
|
||||
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
|
@ -196,20 +221,13 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
|
|||
}
|
||||
}
|
||||
|
||||
private void validateAndUpdateTransform(
|
||||
private void checkPriviledgesAndUpdateTransform(
|
||||
Request request,
|
||||
ClusterState clusterState,
|
||||
TransformConfig config,
|
||||
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
|
||||
ActionListener<Response> listener
|
||||
) {
|
||||
try {
|
||||
SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation());
|
||||
} catch (ElasticsearchStatusException ex) {
|
||||
listener.onFailure(ex);
|
||||
return;
|
||||
}
|
||||
|
||||
// Early check to verify that the user can create the destination index and can read from the source
|
||||
if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) {
|
||||
final String username = securityContext.getUser().principal();
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -20,12 +21,27 @@ import org.elasticsearch.xpack.transform.action.TransportPreviewTransformAction;
|
|||
public class TransportPreviewTransformActionDeprecated extends TransportPreviewTransformAction {
|
||||
|
||||
@Inject
|
||||
public TransportPreviewTransformActionDeprecated(TransportService transportService, ActionFilters actionFilters,
|
||||
Client client, ThreadPool threadPool, XPackLicenseState licenseState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService) {
|
||||
super(PreviewTransformActionDeprecated.NAME, transportService, actionFilters, client, threadPool, licenseState,
|
||||
indexNameExpressionResolver, clusterService);
|
||||
public TransportPreviewTransformActionDeprecated(
|
||||
TransportService transportService,
|
||||
ActionFilters actionFilters,
|
||||
Client client,
|
||||
ThreadPool threadPool,
|
||||
XPackLicenseState licenseState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterService clusterService,
|
||||
Settings settings
|
||||
) {
|
||||
super(
|
||||
PreviewTransformActionDeprecated.NAME,
|
||||
transportService,
|
||||
actionFilters,
|
||||
client,
|
||||
threadPool,
|
||||
licenseState,
|
||||
indexNameExpressionResolver,
|
||||
clusterService,
|
||||
settings
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -31,7 +32,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans
|
|||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
TransformServices transformServices,
|
||||
PersistentTasksService persistentTasksService,
|
||||
Client client
|
||||
Client client,
|
||||
Settings settings
|
||||
) {
|
||||
super(
|
||||
StartTransformActionDeprecated.NAME,
|
||||
|
@ -43,7 +45,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans
|
|||
indexNameExpressionResolver,
|
||||
transformServices,
|
||||
persistentTasksService,
|
||||
client
|
||||
client,
|
||||
settings
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,159 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform.transforms;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* This class contains more complex validations in regards to how {@link TransformConfig#getSource()} and
|
||||
* {@link TransformConfig#getDestination()} relate to each other.
|
||||
*/
|
||||
public final class SourceDestValidator {
|
||||
|
||||
interface SourceDestValidation {
|
||||
boolean isDeferrable();
|
||||
void validate(TransformConfig config, ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver);
|
||||
}
|
||||
|
||||
private static final List<SourceDestValidation> VALIDATIONS = Arrays.asList(new SourceMissingValidation(),
|
||||
new DestinationInSourceValidation(),
|
||||
new DestinationSingleIndexValidation());
|
||||
|
||||
/**
|
||||
* Validates the DataFrameTransformConfiguration source and destination indices.
|
||||
*
|
||||
* A simple name validation is done on {@link TransformConfig#getDestination()} inside
|
||||
* {@link org.elasticsearch.xpack.core.transform.action.PutTransformAction}
|
||||
*
|
||||
* So, no need to do the name checks here.
|
||||
*
|
||||
* @param config DataFrameTransformConfig to validate
|
||||
* @param clusterState The current ClusterState
|
||||
* @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
|
||||
* @throws ElasticsearchStatusException when a validation fails
|
||||
*/
|
||||
public static void validate(TransformConfig config,
|
||||
ClusterState clusterState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
boolean shouldDefer) {
|
||||
for (SourceDestValidation validation : VALIDATIONS) {
|
||||
if (shouldDefer && validation.isDeferrable()) {
|
||||
continue;
|
||||
}
|
||||
validation.validate(config, clusterState, indexNameExpressionResolver);
|
||||
}
|
||||
}
|
||||
|
||||
static class SourceMissingValidation implements SourceDestValidation {
|
||||
|
||||
@Override
|
||||
public boolean isDeferrable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validate(TransformConfig config,
|
||||
ClusterState clusterState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
for(String src : config.getSource().getIndex()) {
|
||||
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
src);
|
||||
if (concreteNames.length == 0) {
|
||||
throw new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_SOURCE_INDEX_MISSING, src),
|
||||
RestStatus.BAD_REQUEST);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class DestinationInSourceValidation implements SourceDestValidation {
|
||||
|
||||
@Override
|
||||
public boolean isDeferrable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validate(TransformConfig config,
|
||||
ClusterState clusterState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final String destIndex = config.getDestination().getIndex();
|
||||
Set<String> concreteSourceIndexNames = new HashSet<>();
|
||||
for(String src : config.getSource().getIndex()) {
|
||||
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
src);
|
||||
if (Regex.simpleMatch(src, destIndex)) {
|
||||
throw new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_IN_SOURCE, destIndex, src),
|
||||
RestStatus.BAD_REQUEST);
|
||||
}
|
||||
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
|
||||
}
|
||||
|
||||
if (concreteSourceIndexNames.contains(destIndex)) {
|
||||
throw new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_IN_SOURCE,
|
||||
destIndex,
|
||||
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
|
||||
RestStatus.BAD_REQUEST
|
||||
);
|
||||
}
|
||||
|
||||
final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
destIndex);
|
||||
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
|
||||
throw new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_IN_SOURCE,
|
||||
concreteDest[0],
|
||||
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
|
||||
RestStatus.BAD_REQUEST
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class DestinationSingleIndexValidation implements SourceDestValidation {
|
||||
|
||||
@Override
|
||||
public boolean isDeferrable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validate(TransformConfig config,
|
||||
ClusterState clusterState,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
final String destIndex = config.getDestination().getIndex();
|
||||
final String[] concreteDest =
|
||||
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
|
||||
|
||||
if (concreteDest.length > 1) {
|
||||
throw new ElasticsearchStatusException(
|
||||
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_DEST_SINGLE_INDEX, destIndex),
|
||||
RestStatus.BAD_REQUEST
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,160 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.transform.transforms;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class SourceDestValidatorTests extends ESTestCase {
|
||||
|
||||
private static final String SOURCE_1 = "source-1";
|
||||
private static final String SOURCE_2 = "source-2";
|
||||
private static final String ALIASED_DEST = "aliased-dest";
|
||||
|
||||
private static final ClusterState CLUSTER_STATE;
|
||||
|
||||
static {
|
||||
IndexMetaData source1 = IndexMetaData.builder(SOURCE_1).settings(Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
|
||||
.putAlias(AliasMetaData.builder("source-1-alias").build())
|
||||
.build();
|
||||
IndexMetaData source2 = IndexMetaData.builder(SOURCE_2).settings(Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
|
||||
.putAlias(AliasMetaData.builder("dest-alias").build())
|
||||
.build();
|
||||
IndexMetaData aliasedDest = IndexMetaData.builder(ALIASED_DEST).settings(Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
|
||||
.putAlias(AliasMetaData.builder("dest-alias").build())
|
||||
.build();
|
||||
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
|
||||
state.metaData(MetaData.builder()
|
||||
.put(IndexMetaData.builder(source1).build(), false)
|
||||
.put(IndexMetaData.builder(source2).build(), false)
|
||||
.put(IndexMetaData.builder(aliasedDest).build(), false));
|
||||
CLUSTER_STATE = state.build();
|
||||
}
|
||||
|
||||
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() {
|
||||
TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null));
|
||||
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false);
|
||||
}
|
||||
|
||||
public void testCheck_GivenMissingConcreteSourceIndex() {
|
||||
TransformConfig config = createTransform(new SourceConfig("missing"), new DestConfig("dest", null));
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("Source index [missing] does not exist"));
|
||||
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
|
||||
}
|
||||
|
||||
public void testCheck_GivenMissingWildcardSourceIndex() {
|
||||
TransformConfig config = createTransform(new SourceConfig("missing*"), new DestConfig("dest", null));
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist"));
|
||||
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexSameAsSourceIndex() {
|
||||
TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null));
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]"));
|
||||
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexMatchesSourceIndex() {
|
||||
TransformConfig config = createTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null));
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
|
||||
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() {
|
||||
TransformConfig config = createTransform(new SourceConfig("source-1", "source-*"),
|
||||
new DestConfig(SOURCE_2, null));
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
|
||||
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() {
|
||||
TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null));
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(),
|
||||
equalTo("Destination index [dest-alias] should refer to a single index"));
|
||||
|
||||
e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(),
|
||||
equalTo("Destination index [dest-alias] should refer to a single index"));
|
||||
}
|
||||
|
||||
public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() {
|
||||
TransformConfig config = createTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null));
|
||||
|
||||
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(),
|
||||
equalTo("Destination index [source-1] is included in source expression [source-1]"));
|
||||
|
||||
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
|
||||
}
|
||||
|
||||
private static TransformConfig createTransform(SourceConfig sourceConfig, DestConfig destConfig) {
|
||||
return new TransformConfig("test",
|
||||
sourceConfig,
|
||||
destConfig,
|
||||
TimeValue.timeValueSeconds(60),
|
||||
null,
|
||||
null,
|
||||
PivotConfigTests.randomPivotConfig(),
|
||||
null);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue