[ML][Data Frame] adds index validations to _start data frame transform (#44191) (#44227)

* [ML][Data Frame] adds index validations to _start data frame transform

* addressing pr comments
This commit is contained in:
Benjamin Trent 2019-07-11 12:50:50 -05:00 committed by GitHub
parent 2768662822
commit 40cc081ad3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 255 additions and 52 deletions

View File

@ -64,6 +64,10 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
return new Request(DataFrameTransformConfig.fromXContent(parser, id, false)); return new Request(DataFrameTransformConfig.fromXContent(parser, id, false));
} }
/**
* More complex validations with how {@link DataFrameTransformConfig#getDestination()} and
* {@link DataFrameTransformConfig#getSource()} relate are done in the transport handler.
*/
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null; ActionRequestValidationException validationException = null;

View File

@ -23,12 +23,10 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
@ -48,16 +46,14 @@ import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges
import org.elasticsearch.xpack.core.security.support.Exceptions; import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.SourceDestValidator;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException; import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class TransportPutDataFrameTransformAction public class TransportPutDataFrameTransformAction
@ -128,58 +124,19 @@ public class TransportPutDataFrameTransformAction
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, transformId))); DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, transformId)));
return; return;
} }
final String destIndex = config.getDestination().getIndex(); try {
Set<String> concreteSourceIndexNames = new HashSet<>(); SourceDestValidator.check(config, clusterState, indexNameExpressionResolver);
for(String src : config.getSource().getIndex()) { } catch (ElasticsearchStatusException ex) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); listener.onFailure(ex);
if (concreteNames.length == 0) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
RestStatus.BAD_REQUEST));
return;
}
if (Regex.simpleMatch(src, destIndex)) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
RestStatus.BAD_REQUEST
));
return;
}
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
}
if (concreteSourceIndexNames.contains(destIndex)) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
destIndex,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
RestStatus.BAD_REQUEST
));
return;
}
final String[] concreteDest =
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
if (concreteDest.length > 1) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
RestStatus.BAD_REQUEST
));
return;
}
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
concreteDest[0],
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
RestStatus.BAD_REQUEST
));
return; return;
} }
// Early check to verify that the user can create the destination index and can read from the source // Early check to verify that the user can create the destination index and can read from the source
if (licenseState.isAuthAllowed()) { if (licenseState.isAuthAllowed()) {
final String destIndex = config.getDestination().getIndex();
final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
config.getDestination().getIndex());
final String username = securityContext.getUser().principal(); final String username = securityContext.getUser().principal();
List<String> srcPrivileges = new ArrayList<>(2); List<String> srcPrivileges = new ArrayList<>(2);
srcPrivileges.add("read"); srcPrivileges.add("read");

View File

@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskS
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.dataframe.transforms.SourceDestValidator;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException; import java.io.IOException;
@ -184,6 +185,8 @@ public class TransportStartDataFrameTransformAction extends
)); ));
return; return;
} }
// Validate source and destination indices
SourceDestValidator.check(config, clusterService.state(), indexNameExpressionResolver);
transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency())); transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency()));
final String destinationIndex = config.getDestination().getIndex(); final String destinationIndex = config.getDestination().getIndex();

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.transforms;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* This class contains more complex validations in regards to how {@link DataFrameTransformConfig#getSource()} and
* {@link DataFrameTransformConfig#getDestination()} relate to each other.
*/
public final class SourceDestValidator {
private SourceDestValidator() {}
/**
* Validates the DataFrameTransformConfiguration source and destination indices.
*
* A simple name validation is done on {@link DataFrameTransformConfig#getDestination()} inside
* {@link org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction}
*
* So, no need to do the name checks here.
*
* @param config DataFrameTransformConfig to validate
* @param clusterState The current ClusterState
* @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
* @throws ElasticsearchStatusException when a validation fails
*/
public static void check(DataFrameTransformConfig config,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver) {
final String destIndex = config.getDestination().getIndex();
Set<String> concreteSourceIndexNames = new HashSet<>();
for(String src : config.getSource().getIndex()) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
if (concreteNames.length == 0) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
RestStatus.BAD_REQUEST);
}
if (Regex.simpleMatch(src, destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
RestStatus.BAD_REQUEST);
}
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
}
if (concreteSourceIndexNames.contains(destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
destIndex,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
RestStatus.BAD_REQUEST
);
}
final String[] concreteDest =
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
if (concreteDest.length > 1) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
RestStatus.BAD_REQUEST
);
}
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
concreteDest[0],
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
RestStatus.BAD_REQUEST
);
}
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.transforms;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.equalTo;
public class SourceDestValidatorTests extends ESTestCase {
private static final String SOURCE_1 = "source-1";
private static final String SOURCE_2 = "source-2";
private static final String ALIASED_DEST = "aliased-dest";
private static final ClusterState CLUSTER_STATE;
static {
IndexMetaData source1 = IndexMetaData.builder(SOURCE_1).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.putAlias(AliasMetaData.builder("source-1-alias").build())
.build();
IndexMetaData source2 = IndexMetaData.builder(SOURCE_2).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.putAlias(AliasMetaData.builder("dest-alias").build())
.build();
IndexMetaData aliasedDest = IndexMetaData.builder(ALIASED_DEST).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.putAlias(AliasMetaData.builder("dest-alias").build())
.build();
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.metaData(MetaData.builder()
.put(IndexMetaData.builder(source1).build(), false)
.put(IndexMetaData.builder(source2).build(), false)
.put(IndexMetaData.builder(aliasedDest).build(), false));
CLUSTER_STATE = state.build();
}
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null));
SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver());
}
public void testCheck_GivenMissingConcreteSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing"), new DestConfig("dest", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Source index [missing] does not exist"));
}
public void testCheck_GivenMissingWildcardSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing*"), new DestConfig("dest", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist"));
}
public void testCheck_GivenDestIndexSameAsSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]"));
}
public void testCheck_GivenDestIndexMatchesSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
}
public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-1", "source-*"),
new DestConfig(SOURCE_2, null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
}
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [dest-alias] should refer to a single index"));
}
public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [source-1] is included in source expression [source-1]"));
}
private static DataFrameTransformConfig createDataFrameTransform(SourceConfig sourceConfig, DestConfig destConfig) {
return new DataFrameTransformConfig("test",
sourceConfig,
destConfig,
TimeValue.timeValueSeconds(60),
null,
null,
PivotConfigTests.randomPivotConfig(),
null);
}
}