[ML] addresses preview bug, and adds check to PUT (#41803) (#41850)

This commit is contained in:
Benjamin Trent 2019-05-06 10:56:26 -05:00 committed by GitHub
parent 79b7ce8697
commit 50fc27e9a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 16 deletions

View File

@ -21,8 +21,9 @@ public class DataFrameMessages {
"Failed to validate data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index";
public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
public static final String REST_PUT_DATA_FRAME_DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]";
public static final String REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX = "Destination index [{0}] should refer to a single index";
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";

View File

@ -6,21 +6,27 @@
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@ -40,15 +46,20 @@ public class TransportPreviewDataFrameTransformAction extends
private final XPackLicenseState licenseState;
private final Client client;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final ClusterService clusterService;
@Inject
public TransportPreviewDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
Client client, ThreadPool threadPool, XPackLicenseState licenseState) {
super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters,
(Writeable.Reader<PreviewDataFrameTransformAction.Request>) PreviewDataFrameTransformAction.Request::new);
Client client, ThreadPool threadPool, XPackLicenseState licenseState,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService) {
super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters, PreviewDataFrameTransformAction.Request::new);
this.licenseState = licenseState;
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
@Override
@ -60,7 +71,18 @@ public class TransportPreviewDataFrameTransformAction extends
return;
}
ClusterState clusterState = clusterService.state();
final DataFrameTransformConfig config = request.getConfig();
for(String src : config.getSource().getIndex()) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
if (concreteNames.length == 0) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
RestStatus.BAD_REQUEST));
return;
}
}
Pivot pivot = new Pivot(config.getPivotConfig());

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -51,7 +52,12 @@ import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigMa
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class TransportPutDataFrameTransformAction
@ -114,14 +120,54 @@ public class TransportPutDataFrameTransformAction
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, transformId)));
return;
}
final String destIndex = config.getDestination().getIndex();
Set<String> concreteSourceIndexNames = new HashSet<>();
for(String src : config.getSource().getIndex()) {
if (indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src).length == 0) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
if (concreteNames.length == 0) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
RestStatus.BAD_REQUEST));
return;
}
if (Regex.simpleMatch(src, destIndex)) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
RestStatus.BAD_REQUEST
));
return;
}
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
}
if (concreteSourceIndexNames.contains(destIndex)) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
destIndex,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
RestStatus.BAD_REQUEST
));
return;
}
final String[] concreteDest =
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
if (concreteDest.length > 1 || Regex.isSimpleMatchPattern(destIndex)) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
RestStatus.BAD_REQUEST
));
return;
}
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
concreteDest[0],
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
RestStatus.BAD_REQUEST
));
return;
}
// Early check to verify that the user can create the destination index and can read from the source
@ -131,18 +177,16 @@ public class TransportPutDataFrameTransformAction
.indices(config.getSource().getIndex())
.privileges("read")
.build();
String[] destPrivileges = new String[3];
destPrivileges[0] = "read";
destPrivileges[1] = "index";
List<String> destPrivileges = new ArrayList<>(3);
destPrivileges.add("read");
destPrivileges.add("index");
// If the destination index does not exist, we can assume that we may have to create it on start.
// We should check that the creating user has the privileges to create the index.
if (indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
config.getDestination().getIndex()).length == 0) {
destPrivileges[2] = "create_index";
if (concreteDest.length == 0) {
destPrivileges.add("create_index");
}
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
.indices(config.getDestination().getIndex())
.indices(destIndex)
.privileges(destPrivileges)
.build();
@ -151,7 +195,6 @@ public class TransportPutDataFrameTransformAction
privRequest.username(username);
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, config, r, listener),
listener::onFailure);

View File

@ -102,3 +102,18 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
"Test preview with non-existing source index":
- do:
catch: /Source index \[does_not_exist\] does not exist/
data_frame.preview_data_frame_transform:
body: >
{
"source": { "index": ["airline-data", "does_not_exist"] },
"pivot": {
"group_by": {
"airline": {"terms": {"field": "airline"}},
"by-hour": {"date_histogram": {"interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}

View File

@ -190,4 +190,115 @@ setup:
transform_id: "_all"
from: 0
size: 10000
---
"Test transform where dest is included in source":
- do:
catch: /Destination index \[airline-data-by-airline\] is included in source expression \[airline-data/
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
body: >
{
"source": {
"index": ["airline-data*"]
},
"dest": { "index": "airline-data-by-airline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
"Test transform where dest is a simple index pattern":
- do:
catch: /Destination index .* should refer to a single index/
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
body: >
{
"source": {
"index": ["airline-data*"]
},
"dest": { "index": "destination*" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
"Test alias scenarios":
- do:
indices.create:
index: created-destination-index
- do:
indices.create:
index: second-created-destination-index
- do:
indices.put_alias:
index: airline-data
name: source-index
- do:
indices.put_alias:
index: created-destination-index
name: dest-index
- do:
data_frame.put_data_frame_transform:
transform_id: "transform-from-aliases"
body: >
{
"source": {
"index": "source-index"
},
"dest": { "index": "dest-index" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- match: { acknowledged: true }
- do:
indices.put_alias:
index: created-destination-index
name: source-index
- do:
catch: /Destination index \[created-destination-index\] is included in source expression \[airline-data,created-destination-index\]/
data_frame.put_data_frame_transform:
transform_id: "transform-from-aliases-failures"
body: >
{
"source": {
"index": "source-index"
},
"dest": { "index": "dest-index" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
indices.delete_alias:
index: created-destination-index
name: source-index
- do:
indices.put_alias:
index: second-created-destination-index
name: dest-index
- do:
catch: /Destination index \[dest-index\] should refer to a single index/
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
body: >
{
"source": {
"index": ["source-index"]
},
"dest": { "index": "dest-index" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}