From 50fc27e9a00484c5d9e25d21b3d626c1cd7d7ab4 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 6 May 2019 10:56:26 -0500 Subject: [PATCH] [ML] addresses preview bug, and adds check to PUT (#41803) (#41850) --- .../core/dataframe/DataFrameMessages.java | 3 +- ...nsportPreviewDataFrameTransformAction.java | 30 ++++- .../TransportPutDataFrameTransformAction.java | 65 ++++++++-- .../test/data_frame/preview_transforms.yml | 15 +++ .../test/data_frame/transforms_crud.yml | 111 ++++++++++++++++++ 5 files changed, 208 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index e027191d8a5..d31892692a5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -21,8 +21,9 @@ public class DataFrameMessages { "Failed to validate data frame configuration"; public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration"; public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings"; - public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index"; public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist"; + public static final String REST_PUT_DATA_FRAME_DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]"; + public static final String REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX = "Destination index [{0}] should refer to a single index"; public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID = "Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument"; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java index 36943f39f8e..f4b93cc6ac4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java @@ -6,21 +6,27 @@ package org.elasticsearch.xpack.dataframe.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -40,15 +46,20 @@ public class TransportPreviewDataFrameTransformAction extends private final XPackLicenseState licenseState; private final Client client; private final ThreadPool threadPool; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final ClusterService clusterService; @Inject public TransportPreviewDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, - Client client, ThreadPool threadPool, XPackLicenseState licenseState) { - super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters, - (Writeable.Reader) PreviewDataFrameTransformAction.Request::new); + Client client, ThreadPool threadPool, XPackLicenseState licenseState, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService) { + super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters, PreviewDataFrameTransformAction.Request::new); this.licenseState = licenseState; this.client = client; this.threadPool = threadPool; + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; } @Override @@ -60,7 +71,18 @@ public class TransportPreviewDataFrameTransformAction extends return; } + ClusterState clusterState = clusterService.state(); + final DataFrameTransformConfig config = request.getConfig(); + for(String src : config.getSource().getIndex()) { + String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); + if (concreteNames.length == 0) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src), + RestStatus.BAD_REQUEST)); + return; + } + } Pivot pivot = new Pivot(config.getPivotConfig()); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index e761c33d569..0b8ef692cdd 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -51,7 +52,12 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class TransportPutDataFrameTransformAction @@ -114,14 +120,54 @@ public class TransportPutDataFrameTransformAction DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, transformId))); return; } - + final String destIndex = config.getDestination().getIndex(); + Set concreteSourceIndexNames = new HashSet<>(); for(String src : config.getSource().getIndex()) { - if (indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src).length == 0) { + String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); + if (concreteNames.length == 0) { listener.onFailure(new ElasticsearchStatusException( DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src), RestStatus.BAD_REQUEST)); return; } + if (Regex.simpleMatch(src, destIndex)) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src), + RestStatus.BAD_REQUEST + )); + return; + } + concreteSourceIndexNames.addAll(Arrays.asList(concreteNames)); + } + + if (concreteSourceIndexNames.contains(destIndex)) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, + destIndex, + Strings.arrayToCommaDelimitedString(config.getSource().getIndex())), + RestStatus.BAD_REQUEST + )); + return; + } + + final String[] concreteDest = + indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex); + + if (concreteDest.length > 1 || Regex.isSimpleMatchPattern(destIndex)) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex), + RestStatus.BAD_REQUEST + )); + return; + } + if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, + concreteDest[0], + Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))), + RestStatus.BAD_REQUEST + )); + return; } // Early check to verify that the user can create the destination index and can read from the source @@ -131,18 +177,16 @@ public class TransportPutDataFrameTransformAction .indices(config.getSource().getIndex()) .privileges("read") .build(); - String[] destPrivileges = new String[3]; - destPrivileges[0] = "read"; - destPrivileges[1] = "index"; + List destPrivileges = new ArrayList<>(3); + destPrivileges.add("read"); + destPrivileges.add("index"); // If the destination index does not exist, we can assume that we may have to create it on start. // We should check that the creating user has the privileges to create the index. - if (indexNameExpressionResolver.concreteIndexNames(clusterState, - IndicesOptions.lenientExpandOpen(), - config.getDestination().getIndex()).length == 0) { - destPrivileges[2] = "create_index"; + if (concreteDest.length == 0) { + destPrivileges.add("create_index"); } RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() - .indices(config.getDestination().getIndex()) + .indices(destIndex) .privileges(destPrivileges) .build(); @@ -151,7 +195,6 @@ public class TransportPutDataFrameTransformAction privRequest.username(username); privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges); - ActionListener privResponseListener = ActionListener.wrap( r -> handlePrivsResponse(username, config, r, listener), listener::onFailure); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml index dede9e55999..94388784177 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml @@ -102,3 +102,18 @@ setup: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } +--- +"Test preview with non-existing source index": + - do: + catch: /Source index \[does_not_exist\] does not exist/ + data_frame.preview_data_frame_transform: + body: > + { + "source": { "index": ["airline-data", "does_not_exist"] }, + "pivot": { + "group_by": { + "airline": {"terms": {"field": "airline"}}, + "by-hour": {"date_histogram": {"interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index b7f951679d1..fa608cefd1e 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -190,4 +190,115 @@ setup: transform_id: "_all" from: 0 size: 10000 +--- +"Test transform where dest is included in source": + - do: + catch: /Destination index \[airline-data-by-airline\] is included in source expression \[airline-data/ + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": { + "index": ["airline-data*"] + }, + "dest": { "index": "airline-data-by-airline" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } +--- +"Test transform where dest is a simple index pattern": + - do: + catch: /Destination index .* should refer to a single index/ + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": { + "index": ["airline-data*"] + }, + "dest": { "index": "destination*" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } +--- +"Test alias scenarios": + - do: + indices.create: + index: created-destination-index + - do: + indices.create: + index: second-created-destination-index + - do: + indices.put_alias: + index: airline-data + name: source-index + - do: + indices.put_alias: + index: created-destination-index + name: dest-index + - do: + data_frame.put_data_frame_transform: + transform_id: "transform-from-aliases" + body: > + { + "source": { + "index": "source-index" + }, + "dest": { "index": "dest-index" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + - do: + indices.put_alias: + index: created-destination-index + name: source-index + + - do: + catch: /Destination index \[created-destination-index\] is included in source expression \[airline-data,created-destination-index\]/ + data_frame.put_data_frame_transform: + transform_id: "transform-from-aliases-failures" + body: > + { + "source": { + "index": "source-index" + }, + "dest": { "index": "dest-index" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + + - do: + indices.delete_alias: + index: created-destination-index + name: source-index + + - do: + indices.put_alias: + index: second-created-destination-index + name: dest-index + + - do: + catch: /Destination index \[dest-index\] should refer to a single index/ + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": { + "index": ["source-index"] + }, + "dest": { "index": "dest-index" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + }