From 40cc081ad3bcac12f98f02e9110738771b6c88c0 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 11 Jul 2019 12:50:50 -0500 Subject: [PATCH] [ML][Data Frame] adds index validations to _start data frame transform (#44191) (#44227) * [ML][Data Frame] adds index validations to _start data frame transform * addressing pr comments --- .../action/PutDataFrameTransformAction.java | 4 + .../TransportPutDataFrameTransformAction.java | 61 ++------ ...ransportStartDataFrameTransformAction.java | 3 + .../transforms/SourceDestValidator.java | 92 +++++++++++ .../transforms/SourceDestValidatorTests.java | 147 ++++++++++++++++++ 5 files changed, 255 insertions(+), 52 deletions(-) create mode 100644 x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java create mode 100644 x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index b58c1154a59..5aef1f9e310 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -64,6 +64,10 @@ public class PutDataFrameTransformAction extends ActionType concreteSourceIndexNames = new HashSet<>(); - for(String src : config.getSource().getIndex()) { - String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); - if (concreteNames.length == 0) { - listener.onFailure(new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src), - RestStatus.BAD_REQUEST)); - return; - } - if (Regex.simpleMatch(src, destIndex)) { - listener.onFailure(new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src), - RestStatus.BAD_REQUEST - )); - return; - } - concreteSourceIndexNames.addAll(Arrays.asList(concreteNames)); - } - - if (concreteSourceIndexNames.contains(destIndex)) { - listener.onFailure(new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, - destIndex, - Strings.arrayToCommaDelimitedString(config.getSource().getIndex())), - RestStatus.BAD_REQUEST - )); - return; - } - - final String[] concreteDest = - indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex); - - if (concreteDest.length > 1) { - listener.onFailure(new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex), - RestStatus.BAD_REQUEST - )); - return; - } - if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) { - listener.onFailure(new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, - concreteDest[0], - Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))), - RestStatus.BAD_REQUEST - )); + try { + SourceDestValidator.check(config, clusterState, indexNameExpressionResolver); + } catch (ElasticsearchStatusException ex) { + listener.onFailure(ex); return; } // Early check to verify that the user can create the destination index and can read from the source if (licenseState.isAuthAllowed()) { + final String destIndex = config.getDestination().getIndex(); + final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + config.getDestination().getIndex()); final String username = securityContext.getUser().principal(); List srcPrivileges = new ArrayList<>(2); srcPrivileges.add("read"); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 8c1d942ec97..76241d0f3aa 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskS import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; +import org.elasticsearch.xpack.dataframe.transforms.SourceDestValidator; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.io.IOException; @@ -184,6 +185,8 @@ public class TransportStartDataFrameTransformAction extends )); return; } + // Validate source and destination indices + SourceDestValidator.check(config, clusterService.state(), indexNameExpressionResolver); transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency())); final String destinationIndex = config.getDestination().getIndex(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java new file mode 100644 index 00000000000..411e1787f1c --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.transforms; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * This class contains more complex validations in regards to how {@link DataFrameTransformConfig#getSource()} and + * {@link DataFrameTransformConfig#getDestination()} relate to each other. + */ +public final class SourceDestValidator { + + private SourceDestValidator() {} + + /** + * Validates the DataFrameTransformConfiguration source and destination indices. + * + * A simple name validation is done on {@link DataFrameTransformConfig#getDestination()} inside + * {@link org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction} + * + * So, no need to do the name checks here. + * + * @param config DataFrameTransformConfig to validate + * @param clusterState The current ClusterState + * @param indexNameExpressionResolver A valid IndexNameExpressionResolver object + * @throws ElasticsearchStatusException when a validation fails + */ + public static void check(DataFrameTransformConfig config, + ClusterState clusterState, + IndexNameExpressionResolver indexNameExpressionResolver) { + + final String destIndex = config.getDestination().getIndex(); + Set concreteSourceIndexNames = new HashSet<>(); + for(String src : config.getSource().getIndex()) { + String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); + if (concreteNames.length == 0) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src), + RestStatus.BAD_REQUEST); + } + if (Regex.simpleMatch(src, destIndex)) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src), + RestStatus.BAD_REQUEST); + } + concreteSourceIndexNames.addAll(Arrays.asList(concreteNames)); + } + + if (concreteSourceIndexNames.contains(destIndex)) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, + destIndex, + Strings.arrayToCommaDelimitedString(config.getSource().getIndex())), + RestStatus.BAD_REQUEST + ); + } + + final String[] concreteDest = + indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex); + + if (concreteDest.length > 1) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex), + RestStatus.BAD_REQUEST + ); + } + if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, + concreteDest[0], + Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))), + RestStatus.BAD_REQUEST + ); + } + } +} diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java new file mode 100644 index 00000000000..48f994de6ca --- /dev/null +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.dataframe.transforms; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.hamcrest.Matchers.equalTo; + +public class SourceDestValidatorTests extends ESTestCase { + + private static final String SOURCE_1 = "source-1"; + private static final String SOURCE_2 = "source-2"; + private static final String ALIASED_DEST = "aliased-dest"; + + private static final ClusterState CLUSTER_STATE; + + static { + IndexMetaData source1 = IndexMetaData.builder(SOURCE_1).settings(Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(SETTING_CREATION_DATE, System.currentTimeMillis())) + .putAlias(AliasMetaData.builder("source-1-alias").build()) + .build(); + IndexMetaData source2 = IndexMetaData.builder(SOURCE_2).settings(Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(SETTING_CREATION_DATE, System.currentTimeMillis())) + .putAlias(AliasMetaData.builder("dest-alias").build()) + .build(); + IndexMetaData aliasedDest = IndexMetaData.builder(ALIASED_DEST).settings(Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(SETTING_CREATION_DATE, System.currentTimeMillis())) + .putAlias(AliasMetaData.builder("dest-alias").build()) + .build(); + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.metaData(MetaData.builder() + .put(IndexMetaData.builder(source1).build(), false) + .put(IndexMetaData.builder(source2).build(), false) + .put(IndexMetaData.builder(aliasedDest).build(), false)); + CLUSTER_STATE = state.build(); + } + + public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null)); + SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()); + } + + public void testCheck_GivenMissingConcreteSourceIndex() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing"), new DestConfig("dest", null)); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("Source index [missing] does not exist")); + } + + public void testCheck_GivenMissingWildcardSourceIndex() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing*"), new DestConfig("dest", null)); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist")); + } + + public void testCheck_GivenDestIndexSameAsSourceIndex() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null)); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]")); + } + + public void testCheck_GivenDestIndexMatchesSourceIndex() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null)); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); + } + + public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-1", "source-*"), + new DestConfig(SOURCE_2, null)); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); + } + + public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null)); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), + equalTo("Destination index [dest-alias] should refer to a single index")); + } + + public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() { + DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null)); + + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), + equalTo("Destination index [source-1] is included in source expression [source-1]")); + } + + private static DataFrameTransformConfig createDataFrameTransform(SourceConfig sourceConfig, DestConfig destConfig) { + return new DataFrameTransformConfig("test", + sourceConfig, + destConfig, + TimeValue.timeValueSeconds(60), + null, + null, + PivotConfigTests.randomPivotConfig(), + null); + } +}