[7.x][ML] Reuse SourceDestValidator for data frame analytics (#50841) (#50850)

This commit removes validation logic of source and dest indices
for data frame analytics and replaces it with using the common
`SourceDestValidator` class which is already used by transforms.
This way the validations and their messages become consistent
while we reduce code.

This means that where these validations fail the error messages
will be slightly different for data frame analytics.

Backport of #50841
This commit is contained in:
Dimitris Athanasiou 2020-01-10 14:24:13 +02:00 committed by GitHub
parent 7e68989dae
commit 422422a2bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 256 additions and 385 deletions

View File

@ -49,7 +49,6 @@ public final class SourceDestValidator {
// messages
public static final String SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
public static final String SOURCE_LOWERCASE = "Source index [{0}] must be lowercase";
public static final String DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]";
public static final String DEST_LOWERCASE = "Destination index [{0}] must be lowercase";
public static final String NEEDS_REMOTE_CLUSTER_SEARCH = "Source index is configured with a remote index pattern(s) [{0}]"
@ -62,6 +61,7 @@ public final class SourceDestValidator {
+ "alias [{0}], at least a [{1}] license is required, found license [{2}]";
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
+ "alias [{0}], license is not active";
public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
// workaround for 7.x: remoteClusterAliases does not throw
private static final ClusterNameExpressionResolver clusterNameExpressionResolver = new ClusterNameExpressionResolver();
@ -222,7 +222,7 @@ public final class SourceDestValidator {
}
}
interface SourceDestValidation {
public interface SourceDestValidation {
void validate(Context context, ActionListener<Context> listener);
}
@ -234,18 +234,7 @@ public final class SourceDestValidator {
public static final SourceDestValidation REMOTE_SOURCE_VALIDATION = new RemoteSourceEnabledAndRemoteLicenseValidation();
public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation();
public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation();
// set of default validation collections, if you want to automatically benefit from new validators, use those
public static final List<SourceDestValidation> PREVIEW_VALIDATIONS = Arrays.asList(SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION);
public static final List<SourceDestValidation> ALL_VALIDATIONS = Arrays.asList(
SOURCE_MISSING_VALIDATION,
REMOTE_SOURCE_VALIDATION,
DESTINATION_IN_SOURCE_VALIDATION,
DESTINATION_SINGLE_INDEX_VALIDATION
);
public static final List<SourceDestValidation> NON_DEFERABLE_VALIDATIONS = Arrays.asList(DESTINATION_SINGLE_INDEX_VALIDATION);
public static final SourceDestValidation REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION = new RemoteSourceNotSupportedValidation();
/**
* Create a new Source Dest Validator
@ -305,10 +294,11 @@ public final class SourceDestValidator {
}
}, listener::onFailure);
// We traverse the validations in reverse order as we chain the listeners from back to front
for (int i = validations.size() - 1; i >= 0; i--) {
final SourceDestValidation validation = validations.get(i);
SourceDestValidation validation = validations.get(i);
final ActionListener<Context> previousValidationListener = validationListener;
validationListener = ActionListener.wrap(c -> { validation.validate(c, previousValidationListener); }, listener::onFailure);
validationListener = ActionListener.wrap(c -> validation.validate(c, previousValidationListener), listener::onFailure);
}
validationListener.onResponse(context);
@ -433,13 +423,13 @@ public final class SourceDestValidator {
return;
}
if (context.resolvedSource.contains(destIndex)) {
if (context.resolveSource().contains(destIndex)) {
context.addValidationError(DEST_IN_SOURCE, destIndex, Strings.arrayToCommaDelimitedString(context.getSource()));
listener.onResponse(context);
return;
}
if (context.resolvedSource.contains(context.resolveDest())) {
if (context.resolveSource().contains(context.resolveDest())) {
context.addValidationError(
DEST_IN_SOURCE,
context.resolveDest(),
@ -460,6 +450,17 @@ public final class SourceDestValidator {
}
}
static class RemoteSourceNotSupportedValidation implements SourceDestValidation {
@Override
public void validate(Context context, ActionListener<Context> listener) {
if (context.resolveRemoteSource().isEmpty() == false) {
context.addValidationError(REMOTE_SOURCE_INDICES_NOT_SUPPORTED);
}
listener.onResponse(context);
}
}
private static String getMessage(String message, Object... args) {
return new MessageFormat(message, Locale.ROOT).format(args);
}

View File

@ -18,9 +18,11 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import java.io.IOException;
import java.util.Objects;
@ -90,14 +92,29 @@ public class PutDataFrameAnalyticsAction extends ActionType<PutDataFrameAnalytic
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException error = null;
error = checkConfigIdIsValid(config, error);
error = SourceDestValidator.validateRequest(error, config.getDest().getIndex());
error = checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(config, error);
return error;
}
private ActionRequestValidationException checkConfigIdIsValid(DataFrameAnalyticsConfig config,
ActionRequestValidationException error) {
if (MlStrings.isValidId(config.getId()) == false) {
error = ValidateActions.addValidationError(Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID,
config.getId()), error);
}
if (!MlStrings.hasValidLengthForId(config.getId())) {
error = ValidateActions.addValidationError(Messages.getMessage(Messages.ID_TOO_LONG, DataFrameAnalyticsConfig.ID,
config.getId(), MlStrings.ID_LENGTH_LIMIT), error);
}
return error;
}
private ActionRequestValidationException checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(
DataFrameAnalyticsConfig config, ActionRequestValidationException error) {
if (config.getAnalyzedFields() == null) {
return null;
return error;
}
for (String analyzedInclude : config.getAnalyzedFields().includes()) {
if (config.getSource().isFieldExcluded(analyzedInclude)) {
@ -107,7 +124,7 @@ public class PutDataFrameAnalyticsAction extends ActionType<PutDataFrameAnalytic
+ DataFrameAnalyticsSource._SOURCE.getPreferredName() + "]", error);
}
}
return null;
return error;
}
@Override

View File

@ -13,15 +13,11 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName;
public class DataFrameAnalyticsDest implements Writeable, ToXContentObject {
public static final ParseField INDEX = new ParseField("index");
@ -94,13 +90,4 @@ public class DataFrameAnalyticsDest implements Writeable, ToXContentObject {
public String getResultsField() {
return resultsField;
}
public void validate() {
if (index != null) {
validateIndexOrAliasName(index, InvalidIndexNameException::new);
if (index.toLowerCase(Locale.ROOT).equals(index) == false) {
throw new InvalidIndexNameException(index, "dest.index must be lowercase");
}
}
}
}

View File

@ -41,7 +41,9 @@ import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.Remote
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -52,6 +54,10 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.spy;
@ -67,6 +73,14 @@ public class SourceDestValidatorTests extends ESTestCase {
private static final String REMOTE_PLATINUM = "remote-platinum";
private static final ClusterState CLUSTER_STATE;
private static final List<SourceDestValidator.SourceDestValidation> TEST_VALIDATIONS = Arrays.asList(
SOURCE_MISSING_VALIDATION,
DESTINATION_IN_SOURCE_VALIDATION,
DESTINATION_SINGLE_INDEX_VALIDATION,
REMOTE_SOURCE_VALIDATION
);
private Client clientWithBasicLicense;
private Client clientWithExpiredBasicLicense;
private Client clientWithPlatinumLicense;
@ -184,13 +198,13 @@ public class SourceDestValidatorTests extends ESTestCase {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() throws InterruptedException {
public void testValidate_GivenSimpleSourceIndexAndValidDestIndex() throws InterruptedException {
assertValidation(
listener -> simpleNonRemoteValidator.validate(
CLUSTER_STATE,
new String[] { SOURCE_1 },
"dest",
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
true,
@ -204,7 +218,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] {},
"dest",
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -221,7 +235,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "missing" },
"dest",
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -236,7 +250,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "missing" },
"dest",
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
true,
@ -250,7 +264,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "missing" },
"dest",
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -265,7 +279,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "missing" },
"dest",
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
true,
@ -279,7 +293,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "wildcard*", "missing" },
"dest",
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -294,7 +308,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "wildcard*", "missing" },
"dest",
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
true,
@ -308,7 +322,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "wildcard*" },
"dest",
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
true,
@ -322,7 +336,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -340,7 +354,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1,
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
true,
@ -354,7 +368,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-*" },
SOURCE_2,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -372,7 +386,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-*" },
SOURCE_2,
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
true,
@ -386,7 +400,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-1", "source-*" },
SOURCE_2,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -404,7 +418,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-1", "source-*" },
SOURCE_2,
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
true,
@ -418,7 +432,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-1", "source-*", "sou*" },
SOURCE_2,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -442,7 +456,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
DEST_ALIAS,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -464,21 +478,11 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
DEST_ALIAS,
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
(Boolean) null,
e -> {
assertEquals(1, e.validationErrors().size());
assertThat(
e.validationErrors().get(0),
equalTo(
"no write index is defined for alias [dest-alias]. "
+ "The write index may be explicitly disabled using is_write_index=false or the alias points "
+ "to multiple indices without one being designated as a write index"
)
);
}
true,
null
);
}
@ -488,7 +492,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
ALIAS_READ_WRITE_DEST,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -514,7 +518,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1_ALIAS,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,
@ -532,7 +536,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1_ALIAS,
SourceDestValidator.NON_DEFERABLE_VALIDATIONS,
Collections.emptyList(),
listener
),
true,
@ -546,7 +550,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "missing" },
SOURCE_1_ALIAS,
SourceDestValidator.ALL_VALIDATIONS,
TEST_VALIDATIONS,
listener
),
(Boolean) null,

View File

@ -7,14 +7,10 @@ package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameAnalyticsDestTests extends AbstractSerializingTestCase<DataFrameAnalyticsDest> {
@Override
@ -37,19 +33,4 @@ public class DataFrameAnalyticsDestTests extends AbstractSerializingTestCase<Dat
protected Writeable.Reader<DataFrameAnalyticsDest> instanceReader() {
return DataFrameAnalyticsDest::new;
}
public void testValidate_GivenIndexWithFunkyChars() {
expectThrows(InvalidIndexNameException.class, () -> new DataFrameAnalyticsDest("<script>foo", null).validate());
}
public void testValidate_GivenIndexWithUppercaseChars() {
InvalidIndexNameException e = expectThrows(InvalidIndexNameException.class,
() -> new DataFrameAnalyticsDest("Foo", null).validate());
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Invalid index name [Foo], dest.index must be lowercase"));
}
public void testValidate_GivenValidIndexName() {
new DataFrameAnalyticsDest("foo_bar_42", null).validate();
}
}

View File

@ -50,6 +50,7 @@ integTest.runner {
'ml/data_frame_analytics_crud/Test put config with dest index same as source index',
'ml/data_frame_analytics_crud/Test put config with dest index matching multiple indices',
'ml/data_frame_analytics_crud/Test put config with dest index included in source via alias',
'ml/data_frame_analytics_crud/Test put config with remote source index',
'ml/data_frame_analytics_crud/Test put config with unknown top level field',
'ml/data_frame_analytics_crud/Test put config with unknown field in outlier detection analysis',
'ml/data_frame_analytics_crud/Test put config given analyzed_fields include field excluded by source',
@ -58,6 +59,7 @@ integTest.runner {
'ml/data_frame_analytics_crud/Test put config given source with empty string in index array',
'ml/data_frame_analytics_crud/Test put config given source without index',
'ml/data_frame_analytics_crud/Test put config given missing dest',
'ml/data_frame_analytics_crud/Test put config given dest index contains uppercase chars',
'ml/data_frame_analytics_crud/Test put config given dest with empty index',
'ml/data_frame_analytics_crud/Test put config given dest without index',
'ml/data_frame_analytics_crud/Test put config given missing analysis',

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
@ -32,14 +33,13 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
@ -47,7 +47,7 @@ import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidations;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
@ -66,6 +66,7 @@ public class TransportPutDataFrameAnalyticsAction
private final SecurityContext securityContext;
private final Client client;
private final DataFrameAnalyticsAuditor auditor;
private final SourceDestValidator sourceDestValidator;
private volatile ByteSizeValue maxModelMemoryLimit;
@ -86,6 +87,14 @@ public class TransportPutDataFrameAnalyticsAction
maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearningField.MAX_MODEL_MEMORY_LIMIT, this::setMaxModelMemoryLimit);
this.sourceDestValidator = new SourceDestValidator(
indexNameExpressionResolver,
transportService.getRemoteClusterService(),
null,
clusterService.getNodeName(),
License.OperationMode.PLATINUM.description()
);
}
private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) {
@ -110,9 +119,21 @@ public class TransportPutDataFrameAnalyticsAction
@Override
protected void masterOperation(PutDataFrameAnalyticsAction.Request request, ClusterState state,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
validateConfig(request.getConfig());
final DataFrameAnalyticsConfig config = request.getConfig();
ActionListener<Boolean> sourceDestValidationListener = ActionListener.wrap(
aBoolean -> putValidatedConfig(config, listener),
listener::onFailure
);
sourceDestValidator.validate(clusterService.state(), config.getSource().getIndex(), config.getDest().getIndex(),
SourceDestValidations.ALL_VALIDATIONS, sourceDestValidationListener);
}
private void putValidatedConfig(DataFrameAnalyticsConfig config, ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
DataFrameAnalyticsConfig preparedForPutConfig =
new DataFrameAnalyticsConfig.Builder(request.getConfig(), maxModelMemoryLimit)
new DataFrameAnalyticsConfig.Builder(config, maxModelMemoryLimit)
.setCreateTime(Instant.now())
.setVersion(Version.CURRENT)
.build();
@ -202,19 +223,6 @@ public class TransportPutDataFrameAnalyticsAction
listener::onFailure));
}
private void validateConfig(DataFrameAnalyticsConfig config) {
if (MlStrings.isValidId(config.getId()) == false) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID,
config.getId()));
}
if (!MlStrings.hasValidLengthForId(config.getId())) {
throw ExceptionsHelper.badRequestException("id [{}] is too long; must not contain more than {} characters", config.getId(),
MlStrings.ID_LENGTH_LIMIT);
}
config.getDest().validate();
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
}
@Override
protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -46,6 +47,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
@ -63,7 +65,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.MappingsMerger;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
import org.elasticsearch.xpack.ml.dataframe.SourceDestValidations;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetectorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@ -100,6 +102,7 @@ public class TransportStartDataFrameAnalyticsAction
private final DataFrameAnalyticsConfigProvider configProvider;
private final MlMemoryTracker memoryTracker;
private final DataFrameAnalyticsAuditor auditor;
private final SourceDestValidator sourceDestValidator;
@Inject
public TransportStartDataFrameAnalyticsAction(TransportService transportService, Client client, ClusterService clusterService,
@ -116,6 +119,14 @@ public class TransportStartDataFrameAnalyticsAction
this.configProvider = configProvider;
this.memoryTracker = memoryTracker;
this.auditor = Objects.requireNonNull(auditor);
this.sourceDestValidator = new SourceDestValidator(
indexNameExpressionResolver,
transportService.getRemoteClusterService(),
null,
clusterService.getNodeName(),
License.OperationMode.PLATINUM.description()
);
}
@Override
@ -227,13 +238,13 @@ public class TransportStartDataFrameAnalyticsAction
private void getStartContext(String id, ActionListener<StartContext> finalListener) {
// Step 6. Validate that there are analyzable data in the source index
// Step 7. Validate that there are analyzable data in the source index
ActionListener<StartContext> validateMappingsMergeListener = ActionListener.wrap(
startContext -> validateSourceIndexHasRows(startContext, finalListener),
finalListener::onFailure
);
// Step 5. Validate mappings can be merged
// Step 6. Validate mappings can be merged
ActionListener<StartContext> toValidateMappingsListener = ActionListener.wrap(
startContext -> MappingsMerger.mergeMappings(client, startContext.config.getHeaders(),
startContext.config.getSource(), ActionListener.wrap(
@ -241,7 +252,7 @@ public class TransportStartDataFrameAnalyticsAction
finalListener::onFailure
);
// Step 4. Validate dest index is empty if task is starting for first time
// Step 5. Validate dest index is empty if task is starting for first time
ActionListener<StartContext> toValidateDestEmptyListener = ActionListener.wrap(
startContext -> {
switch (startContext.startingState) {
@ -265,22 +276,30 @@ public class TransportStartDataFrameAnalyticsAction
finalListener::onFailure
);
// Step 3. Validate source and dest; check data extraction is possible
// Step 4. Check data extraction is possible
ActionListener<StartContext> toValidateExtractionPossibleListener = ActionListener.wrap(
startContext -> {
new ExtractedFieldsDetectorFactory(client).createFromSource(startContext.config, ActionListener.wrap(
extractedFieldsDetector -> {
startContext.extractedFields = extractedFieldsDetector.detect().v1();
toValidateDestEmptyListener.onResponse(startContext);
},
finalListener::onFailure)
);
},
finalListener::onFailure
);
// Step 3. Validate source and dest
ActionListener<StartContext> startContextListener = ActionListener.wrap(
startContext -> {
// Validate the query parses
startContext.config.getSource().getParsedQuery();
// Validate source/dest are valid
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(startContext.config);
// Validate extraction is possible
new ExtractedFieldsDetectorFactory(client).createFromSource(startContext.config, ActionListener.wrap(
extractedFieldsDetector -> {
startContext.extractedFields = extractedFieldsDetector.detect().v1();
toValidateDestEmptyListener.onResponse(startContext);
},
finalListener::onFailure));
sourceDestValidator.validate(clusterService.state(), startContext.config.getSource().getIndex(),
startContext.config.getDest().getIndex(), SourceDestValidations.ALL_VALIDATIONS, ActionListener.wrap(
aBoolean -> toValidateExtractionPossibleListener.onResponse(startContext), finalListener::onFailure));
},
finalListener::onFailure
);

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import java.util.Arrays;
import java.util.List;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
public final class SourceDestValidations {
private SourceDestValidations() {}
public static final List<SourceDestValidator.SourceDestValidation> ALL_VALIDATIONS = Arrays.asList(
SOURCE_MISSING_VALIDATION,
DESTINATION_SINGLE_INDEX_VALIDATION,
DESTINATION_IN_SOURCE_VALIDATION,
REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION
);
}

View File

@ -1,68 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class SourceDestValidator {
private final ClusterState clusterState;
private final IndexNameExpressionResolver indexNameExpressionResolver;
public SourceDestValidator(ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver) {
this.clusterState = Objects.requireNonNull(clusterState);
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
}
public void check(DataFrameAnalyticsConfig config) {
String[] sourceIndex = config.getSource().getIndex();
String destIndex = config.getDest().getIndex();
String[] sourceExpressions = Arrays.stream(sourceIndex)
.map(index -> Strings.tokenizeToStringArray(index, ","))
.flatMap(Arrays::stream)
.toArray(String[]::new);
for (String sourceExpression : sourceExpressions) {
if (Regex.simpleMatch(sourceExpression, destIndex)) {
throw ExceptionsHelper.badRequestException("Destination index [{}] must not be included in source index [{}]",
destIndex, sourceExpression);
}
}
Set<String> concreteSourceIndexNames = new HashSet<>(Arrays.asList(indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(), sourceExpressions)));
if (concreteSourceIndexNames.isEmpty()) {
throw ExceptionsHelper.badRequestException("No index matches source index {}", Arrays.toString(sourceIndex));
}
final String[] concreteDestIndexNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(), destIndex);
if (concreteDestIndexNames.length > 1) {
// In case it is an alias, it may match multiple indices
throw ExceptionsHelper.badRequestException("Destination index [{}] should match a single index; matches {}", destIndex,
Arrays.toString(concreteDestIndexNames));
}
if (concreteDestIndexNames.length == 1 && concreteSourceIndexNames.contains(concreteDestIndexNames[0])) {
// In case the dest index is an alias, we need to check the concrete index is not matched by source
throw ExceptionsHelper.badRequestException("Destination index [{}], which is an alias for [{}], " +
"must not be included in source index {}", destIndex, concreteDestIndexNames[0], Arrays.toString(sourceIndex));
}
}
}

View File

@ -1,188 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.equalTo;
public class SourceDestValidatorTests extends ESTestCase {
private static final String SOURCE_1 = "source-1";
private static final String SOURCE_2 = "source-2";
private static final String ALIASED_DEST = "aliased-dest";
private static final ClusterState CLUSTER_STATE;
static {
IndexMetaData source1 = IndexMetaData.builder(SOURCE_1).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.putAlias(AliasMetaData.builder("source-1-alias").build())
.build();
IndexMetaData source2 = IndexMetaData.builder(SOURCE_2).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.putAlias(AliasMetaData.builder("dest-alias").build())
.build();
IndexMetaData aliasedDest = IndexMetaData.builder(ALIASED_DEST).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.putAlias(AliasMetaData.builder("dest-alias").build())
.build();
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.metaData(MetaData.builder()
.put(IndexMetaData.builder(source1).build(), false)
.put(IndexMetaData.builder(source2).build(), false)
.put(IndexMetaData.builder(aliasedDest).build(), false));
CLUSTER_STATE = state.build();
}
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource("source-1"))
.setDest(new DataFrameAnalyticsDest("dest", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
validator.check(config);
}
public void testCheck_GivenMissingConcreteSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource("missing"))
.setDest(new DataFrameAnalyticsDest("dest", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> validator.check(config));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("No index matches source index [missing]"));
}
public void testCheck_GivenMissingWildcardSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource("missing*"))
.setDest(new DataFrameAnalyticsDest("dest", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> validator.check(config));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("No index matches source index [missing*]"));
}
public void testCheck_GivenDestIndexSameAsSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource("source-1"))
.setDest(new DataFrameAnalyticsDest("source-1", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> validator.check(config));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-1] must not be included in source index [source-1]"));
}
public void testCheck_GivenDestIndexMatchesSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource("source-*"))
.setDest(new DataFrameAnalyticsDest(SOURCE_2, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> validator.check(config));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] must not be included in source index [source-*]"));
}
public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource("source-1,source-*"))
.setDest(new DataFrameAnalyticsDest(SOURCE_2, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> validator.check(config));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] must not be included in source index [source-*]"));
}
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource(SOURCE_1))
.setDest(new DataFrameAnalyticsDest("dest-alias", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> validator.check(config));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [dest-alias] should match a single index; matches [source-2, aliased-dest]"));
}
public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setSource(createSource("source-1"))
.setDest(new DataFrameAnalyticsDest("source-1-alias", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
SourceDestValidator validator = new SourceDestValidator(CLUSTER_STATE, new IndexNameExpressionResolver());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> validator.check(config));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [source-1-alias], which is an alias for [source-1], " +
"must not be included in source index [source-1]"));
}
private static DataFrameAnalyticsSource createSource(String... index) {
return new DataFrameAnalyticsSource(index, null, null);
}
}

View File

@ -268,6 +268,24 @@ setup:
"analysis": {"outlier_detection":{}}
}
---
"Test put config given dest index contains uppercase chars":
- do:
catch: /.*reason=Validation Failed.* Destination index \[Foo\] must be lowercase;.*/
ml.put_data_frame_analytics:
id: "config-given-dest-index-uppercase"
body: >
{
"source": {
"index": "index-source"
},
"dest": {
"index": "Foo"
},
"analysis": {"outlier_detection":{}}
}
---
"Test put config with pattern dest index name":
@ -290,7 +308,7 @@ setup:
"Test put config with missing concrete source index":
- do:
catch: /No index matches source index \[missing]/
catch: /.*reason=Validation Failed.* no such index \[missing\]/
ml.put_data_frame_analytics:
id: "config-with-missing-concrete-source-index"
body: >
@ -308,7 +326,6 @@ setup:
"Test put config with missing wildcard source index":
- do:
catch: /No index matches source index \[missing\*\]/
ml.put_data_frame_analytics:
id: "config-with-missing-wildcard-source-index"
body: >
@ -321,12 +338,13 @@ setup:
},
"analysis": {"outlier_detection":{}}
}
- match: { id: "config-with-missing-wildcard-source-index" }
---
"Test put config with dest index same as source index":
- do:
catch: /Destination index \[index-source\] must not be included in source index \[index-source\]/
catch: /.*reason=Validation Failed.* Destination index \[index-source\] is included in source expression \[index-source\]/
ml.put_data_frame_analytics:
id: "config-with-same-source-dest-index"
body: >
@ -362,7 +380,7 @@ setup:
name: multiple-dest-index
- do:
catch: /Destination index \[multiple-dest-index\] should match a single index; matches \[multiple-dest-index-[12], multiple-dest-index-[12]\]/
catch: /.*reason=Validation Failed.* no write index is defined for alias \[multiple-dest-index\].*/
ml.put_data_frame_analytics:
id: "config-with-dest-index-matching-multiple-indices"
body: >
@ -389,7 +407,7 @@ setup:
name: dest-alias
- do:
catch: /Destination index \[dest-alias\], which is an alias for \[another-source-index\], must not be included in source index \[another-source-index\]/
catch: /.*reason=Validation Failed.* Destination index \[another-source-index\] is included in source expression \[another-source-index\]/
ml.put_data_frame_analytics:
id: "config-with-dest-index-included-in-source-via-alias"
body: >
@ -403,6 +421,24 @@ setup:
"analysis": {"outlier_detection":{}}
}
---
"Test put config with remote source index":
- do:
catch: /.*reason=Validation Failed.* remote source indices are not supported/
ml.put_data_frame_analytics:
id: "config-with-missing-concrete-source-index"
body: >
{
"source": {
"index": "remote_cluster:foo"
},
"dest": {
"index": "index-dest"
},
"analysis": {"outlier_detection":{}}
}
---
"Test put config with unknown top level field":

View File

@ -31,7 +31,7 @@
index: missing
- do:
catch: /No index matches source index \[missing]/
catch: /.*reason=Validation Failed.* no such index \[missing\]/
ml.start_data_frame_analytics:
id: "missing_index"

View File

@ -52,6 +52,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
import java.util.ArrayList;
import java.util.HashMap;
@ -139,7 +140,7 @@ public class TransportPreviewTransformAction extends
clusterState,
config.getSource().getIndex(),
config.getDestination().getIndex(),
SourceDestValidator.PREVIEW_VALIDATIONS,
SourceDestValidations.PREVIEW_VALIDATIONS,
ActionListener.wrap(r -> {
Pivot pivot = new Pivot(config.getPivotConfig());

View File

@ -56,6 +56,7 @@ import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
import java.io.IOException;
import java.time.Instant;
@ -232,7 +233,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
clusterState,
config.getSource().getIndex(),
config.getDestination().getIndex(),
request.isDeferValidation() ? SourceDestValidator.NON_DEFERABLE_VALIDATIONS : SourceDestValidator.ALL_VALIDATIONS,
request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
ActionListener.wrap(
validationResponse -> {
// Early check to verify that the user can create the destination index and can read from the source

View File

@ -51,6 +51,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
import java.io.IOException;
import java.time.Clock;
@ -277,7 +278,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
clusterService.state(),
config.getSource().getIndex(),
config.getDestination().getIndex(),
SourceDestValidator.ALL_VALIDATIONS,
SourceDestValidations.ALL_VALIDATIONS,
validationListener
);
}, listener::onFailure);

View File

@ -55,6 +55,7 @@ import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
import java.io.IOException;
import java.time.Clock;
@ -176,7 +177,7 @@ public class TransportUpdateTransformAction extends TransportMasterNodeAction<Re
clusterState,
updatedConfig.getSource().getIndex(),
updatedConfig.getDestination().getIndex(),
request.isDeferValidation() ? SourceDestValidator.NON_DEFERABLE_VALIDATIONS : SourceDestValidator.ALL_VALIDATIONS,
request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
ActionListener.wrap(
validationResponse -> {
checkPriviledgesAndUpdateTransform(request, clusterState, updatedConfig, configAndVersion.v2(), listener);

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.utils;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
/**
* Packs together useful sets of validations in the context transforms
*/
public final class SourceDestValidations {
private SourceDestValidations() {}
public static final List<SourceDestValidator.SourceDestValidation> PREVIEW_VALIDATIONS = Arrays.asList(
SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION);
public static final List<SourceDestValidator.SourceDestValidation> ALL_VALIDATIONS = Arrays.asList(
SOURCE_MISSING_VALIDATION,
REMOTE_SOURCE_VALIDATION,
DESTINATION_IN_SOURCE_VALIDATION,
DESTINATION_SINGLE_INDEX_VALIDATION
);
public static final List<SourceDestValidator.SourceDestValidation> NON_DEFERABLE_VALIDATIONS = Collections.singletonList(
DESTINATION_SINGLE_INDEX_VALIDATION
);
}