diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index df9ab53c1ce..728505f3de0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -20,16 +20,17 @@ public class DataFrameMessages { public static final String REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION = "Failed to validate data frame configuration"; public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration"; - public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS = "Failed to deduce target mappings"; - public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index"; - public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK = - "Failed to start persistent task, configuration has been cleaned up: [{0}]"; + public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings"; + public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index"; + public static final String REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS = "dest index [{0}] already exists"; + public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist"; public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID = "Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument"; + public static final String DATA_FRAME_CONFIG_INVALID = "Data frame transform configuration is invalid [{0}]"; public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]"; - public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]"; + public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]"; public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION = "Failed to load data frame transform configuration for transform [{0}]"; public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION = diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java index 7759e7d3f16..86db09e9f21 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java @@ -8,12 +8,10 @@ package org.elasticsearch.xpack.core.dataframe.action; import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; -import org.elasticsearch.action.support.tasks.BaseTasksRequest; -import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.ParseField; @@ -25,7 +23,6 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -52,7 +49,7 @@ public class GetDataFrameTransformsAction extends Action implements ToXContent { + public static class Request extends ActionRequest implements ToXContent { private String id; public Request(String id) { @@ -63,23 +60,14 @@ public class GetDataFrameTransformsAction extends Action transformConfigurations; public Response(List transformConfigs) { - super(Collections.emptyList(), Collections.emptyList()); - this.transformConfigurations = transformConfigs; - } - - public Response(List transformConfigs, List taskFailures, - List nodeFailures) { - super(taskFailures, nodeFailures); this.transformConfigurations = transformConfigs; } public Response() { - super(Collections.emptyList(), Collections.emptyList()); + this.transformConfigurations = Collections.emptyList(); } public Response(StreamInput in) throws IOException { - super(in); readFrom(in); } @@ -173,7 +153,6 @@ public class GetDataFrameTransformsAction extends Action invalidTransforms = new ArrayList<>(); builder.startObject(); - toXContentCommon(builder, params); builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size()); // XContentBuilder does not support passing the params object for Iterables builder.field(DataFrameField.TRANSFORMS.getPreferredName()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java index a9e9538288f..0ac94b6c6aa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java @@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.dataframe.action; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.io.stream.StreamInput; @@ -39,14 +39,15 @@ public class StartDataFrameTransformAction extends Action implements ToXContent { + public static class Request extends AcknowledgedRequest implements ToXContent { + private String id; public Request(String id) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); } - private Request() { + public Request() { } public Request(StreamInput in) throws IOException { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java new file mode 100644 index 00000000000..a51b9243c3d --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +public class StartDataFrameTransformTaskAction extends Action { + + public static final StartDataFrameTransformTaskAction INSTANCE = new StartDataFrameTransformTaskAction(); + public static final String NAME = "cluster:admin/data_frame/start_task"; + + private StartDataFrameTransformTaskAction() { + super(NAME); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends BaseTasksRequest implements ToXContent { + + private String id; + + public Request(String id) { + this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); + } + + public Request() { + } + + public Request(StreamInput in) throws IOException { + super(in); + id = in.readString(); + } + + public String getId() { + return id; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(DataFrameField.ID.getPreferredName(), id); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(id, other.id); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, StartDataFrameTransformTaskAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + private boolean started; + + public Response() { + super(Collections.emptyList(), Collections.emptyList()); + } + + public Response(StreamInput in) throws IOException { + super(in); + readFrom(in); + } + + public Response(boolean started) { + super(Collections.emptyList(), Collections.emptyList()); + this.started = started; + } + + public boolean isStarted() { + return started; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + started = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(started); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + toXContentCommon(builder, params); + builder.field("started", started); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Response response = (Response) obj; + return started == response.started; + } + + @Override + public int hashCode() { + return Objects.hash(started); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index d50716317a0..6172bb2de1f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -36,7 +36,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona */ public class DataFrameTransformConfig extends AbstractDiffable implements Writeable, ToXContentObject { - private static final String NAME = "data_frame_transform_config"; + public static final String NAME = "data_frame_transform_config"; public static final ParseField HEADERS = new ParseField("headers"); public static final ParseField QUERY = new ParseField("query"); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index a824e63e31f..431905ed75e 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -218,8 +218,11 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { protected static String getDataFrameIndexerState(String transformId) throws IOException { Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats")); - - Map transformStatsAsMap = (Map) ((List) entityAsMap(statsResponse).get("transforms")).get(0); + List transforms = ((List) entityAsMap(statsResponse).get("transforms")); + if (transforms.isEmpty()) { + return null; + } + Map transformStatsAsMap = (Map) transforms.get(0); return (String) XContentMapValues.extractValue("state.transform_state", transformStatsAsMap); } @@ -234,7 +237,6 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { protected static void wipeDataFrameTransforms() throws IOException, InterruptedException { List> transformConfigs = getDataFrameTransforms(); - for (Map transformConfig : transformConfigs) { String transformId = (String) transformConfig.get("id"); Request request = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_stop"); @@ -242,7 +244,10 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { request.addParameter("timeout", "10s"); request.addParameter("ignore", "404"); adminClient().performRequest(request); - assertEquals("stopped", getDataFrameIndexerState(transformId)); + String state = getDataFrameIndexerState(transformId); + if (state != null) { + assertEquals("stopped", getDataFrameIndexerState(transformId)); + } } for (Map transformConfig : transformConfigs) { diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java index c5436049c1a..5fcdba603ee 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java @@ -35,7 +35,7 @@ public class DataFrameUsageIT extends DataFrameRestTestCase { indicesCreated = true; } - public void testUsage() throws IOException { + public void testUsage() throws Exception { Response usageResponse = client().performRequest(new Request("GET", "_xpack/usage")); Map usageAsMap = entityAsMap(usageResponse); @@ -47,12 +47,20 @@ public class DataFrameUsageIT extends DataFrameRestTestCase { // create a transform createPivotReviewsTransform("test_usage", "pivot_reviews", null); + usageResponse = client().performRequest(new Request("GET", "_xpack/usage")); + usageAsMap = entityAsMap(usageResponse); + assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap)); + assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap)); + + // TODO remove as soon as stats are stored in an index instead of ClusterState with the task + startAndWaitForTransform("test_usage", "pivot_reviews"); usageResponse = client().performRequest(new Request("GET", "_xpack/usage")); usageAsMap = entityAsMap(usageResponse); // we should see some stats assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap)); + assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap)); assertEquals(0, XContentMapValues.extractValue("data_frame.stats.index_failures", usageAsMap)); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 67d16c654ad..ced9ab8512a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction; @@ -54,6 +55,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsS import org.elasticsearch.xpack.dataframe.action.TransportPreviewDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; @@ -148,6 +150,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu return Arrays.asList( new ActionHandler<>(PutDataFrameTransformAction.INSTANCE, TransportPutDataFrameTransformAction.class), new ActionHandler<>(StartDataFrameTransformAction.INSTANCE, TransportStartDataFrameTransformAction.class), + new ActionHandler<>(StartDataFrameTransformTaskAction.INSTANCE, TransportStartDataFrameTransformTaskAction.class), new ActionHandler<>(StopDataFrameTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class), new ActionHandler<>(DeleteDataFrameTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class), new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class), diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java index 7bb9ecd3095..07b2bfa251c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java @@ -17,13 +17,18 @@ import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage; +import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.indexing.IndexerState; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; public class DataFrameFeatureSet implements XPackFeatureSet { @@ -71,18 +76,33 @@ public class DataFrameFeatureSet implements XPackFeatureSet { return; } - GetDataFrameTransformsStatsAction.Request transformStatsRequest = new GetDataFrameTransformsStatsAction.Request(MetaData.ALL); + GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL); + client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap( + transforms -> { + Set transformIds = transforms.getTransformConfigurations() + .stream() + .map(DataFrameTransformConfig::getId) + .collect(Collectors.toSet()); + GetDataFrameTransformsStatsAction.Request transformStatsRequest = + new GetDataFrameTransformsStatsAction.Request(MetaData.ALL); + client.execute(GetDataFrameTransformsStatsAction.INSTANCE, + transformStatsRequest, + ActionListener.wrap(transformStatsResponse -> { + Map transformsCountByState = new HashMap<>(); + DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats(); - client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest, ActionListener.wrap(transformStatsResponse -> { - Map transformsCountByState = new HashMap<>(); - DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats(); + transformStatsResponse.getTransformsStateAndStats().forEach(singleResult -> { + transformIds.remove(singleResult.getId()); + transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum); + accumulatedStats.merge(singleResult.getTransformStats()); + }); + // If there is no state returned, assumed stopped + transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum)); - transformStatsResponse.getTransformsStateAndStats().stream().forEach(singleResult -> { - transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum); - accumulatedStats.merge(singleResult.getTransformStats()); - }); - - listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats)); - }, listener::onFailure)); + listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats)); + }, listener::onFailure)); + }, + listener::onFailure + )); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java index be73634fb5c..43372667fd3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java @@ -7,98 +7,34 @@ package org.elasticsearch.xpack.dataframe.action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Request; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response; -import org.elasticsearch.xpack.dataframe.persistence.DataFramePersistentTaskUtils; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; -import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; -public class TransportGetDataFrameTransformsAction extends - TransportTasksAction { +public class TransportGetDataFrameTransformsAction extends HandledTransportAction { private final DataFrameTransformsConfigManager transformsConfigManager; @Inject public TransportGetDataFrameTransformsAction(TransportService transportService, ActionFilters actionFilters, - ClusterService clusterService, DataFrameTransformsConfigManager transformsConfigManager) { - super(GetDataFrameTransformsAction.NAME, clusterService, transportService, actionFilters, GetDataFrameTransformsAction.Request::new, - GetDataFrameTransformsAction.Response::new, GetDataFrameTransformsAction.Response::new, ThreadPool.Names.SAME); + DataFrameTransformsConfigManager transformsConfigManager) { + super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, () -> new Request()); this.transformsConfigManager = transformsConfigManager; } - @Override - protected Response newResponse(Request request, List tasks, List taskOperationFailures, - List failedNodeExceptions) { - List configs = tasks.stream() - .flatMap(r -> r.getTransformConfigurations().stream()) - .sorted(Comparator.comparing(DataFrameTransformConfig::getId)) - .collect(Collectors.toList()); - return new Response(configs, taskOperationFailures, failedNodeExceptions); - } - - @Override - protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener listener) { - assert task.getTransformId().equals(request.getId()) || request.getId().equals(MetaData.ALL); - // Little extra insurance, make sure we only return transforms that aren't cancelled - if (task.isCancelled() == false) { - transformsConfigManager.getTransformConfiguration(task.getTransformId(), ActionListener.wrap(config -> { - listener.onResponse(new Response(Collections.singletonList(config))); - }, e -> { - listener.onFailure(new RuntimeException("failed to retrieve...", e)); - })); - } else { - listener.onResponse(new Response(Collections.emptyList())); - } - } - @Override protected void doExecute(Task task, Request request, ActionListener listener) { - final ClusterState state = clusterService.state(); - final DiscoveryNodes nodes = state.nodes(); - - if (nodes.isLocalNodeElectedMaster()) { - if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) { - super.doExecute(task, request, listener); - } else { - // If we couldn't find the transform in the persistent task CS, it means it was deleted prior to this GET - // and we can just send an empty response, no need to go looking for the allocated task - listener.onResponse(new Response(Collections.emptyList())); - } - - } else { - // Delegates GetTransforms to elected master node, so it becomes the coordinating node. - // Non-master nodes may have a stale cluster state that shows transforms which are cancelled - // on the master, which makes testing difficult. - if (nodes.getMasterNode() == null) { - listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); - } else { - transportService.sendRequest(nodes.getMasterNode(), actionName, request, - new ActionListenerResponseHandler<>(listener, Response::new)); - } - } + //TODO support comma delimited and simple regex IDs + transformsConfigManager.getTransformConfigurations(request.getId(), ActionListener.wrap( + configs -> listener.onResponse(new Response(configs)), + listener::onFailure + )); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index 61329d0bf31..a3cd220f257 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -73,6 +73,7 @@ public class TransportGetDataFrameTransformsStatsAction extends } @Override + // TODO gather stats from docs when moved out of allocated task protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); final DiscoveryNodes nodes = state.nodes(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index b2734213db4..b696243cc5d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -8,9 +8,13 @@ package org.elasticsearch.xpack.dataframe.action; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -18,27 +22,39 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse; +import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; +import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges; +import org.elasticsearch.xpack.core.security.support.Exceptions; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; +import java.io.IOException; import java.util.Map; import java.util.stream.Collectors; @@ -48,21 +64,23 @@ public class TransportPutDataFrameTransformAction private static final Logger logger = LogManager.getLogger(TransportPutDataFrameTransformAction.class); private final XPackLicenseState licenseState; - private final PersistentTasksService persistentTasksService; private final Client client; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; + private final SecurityContext securityContext; @Inject - public TransportPutDataFrameTransformAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, XPackLicenseState licenseState, - PersistentTasksService persistentTasksService, DataFrameTransformsConfigManager dataFrameTransformsConfigManager, - Client client) { + public TransportPutDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService, XPackLicenseState licenseState, + PersistentTasksService persistentTasksService, + DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) { super(PutDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutDataFrameTransformAction.Request::new); this.licenseState = licenseState; - this.persistentTasksService = persistentTasksService; this.client = client; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; + this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ? + new SecurityContext(settings, threadPool.getThreadContext()) : null; } @Override @@ -101,62 +119,130 @@ public class TransportPutDataFrameTransformAction return; } - // create the transform, for now we only have pivot and no support for custom queries - Pivot pivot = new Pivot(config.getSource(), new MatchAllQueryBuilder(), config.getPivotConfig()); + String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + config.getDestination()); - // the non-state creating steps are done first, so we minimize the chance to end up with orphaned state transform validation - pivot.validate(client, ActionListener.wrap(validationResult -> { - // deduce target mappings - pivot.deduceMappings(client, ActionListener.wrap(mappings -> { - // create the destination index - DataframeIndex.createDestinationIndex(client, config, mappings, ActionListener.wrap(createIndexResult -> { - DataFrameTransform transform = createDataFrameTransform(transformId, threadPool); - // create the transform configuration and store it in the internal index - dataFrameTransformsConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> { - // finally start the persistent task - persistentTasksService.sendStartRequest(transform.getId(), DataFrameTransform.NAME, transform, - ActionListener.wrap(persistentTask -> { - listener.onResponse(new PutDataFrameTransformAction.Response(true)); - }, startPersistentTaskException -> { - // delete the otherwise orphaned transform configuration, for now we do not delete the destination index - dataFrameTransformsConfigManager.deleteTransformConfiguration(transformId, ActionListener.wrap(r2 -> { - logger.debug("Deleted data frame transform [{}] configuration from data frame configuration index", - transformId); - listener.onFailure( - new RuntimeException( - DataFrameMessages.getMessage( - DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK, r2), - startPersistentTaskException)); - }, deleteTransformFromIndexException -> { - logger.error("Failed to cleanup orphaned data frame transform [{}] configuration", transformId); - listener.onFailure( - new RuntimeException( - DataFrameMessages.getMessage( - DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK, false), - startPersistentTaskException)); - })); - })); - }, listener::onFailure)); - }, createDestinationIndexException -> { - listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX, - createDestinationIndexException)); - })); - }, deduceTargetMappingsException -> { - listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS, - deduceTargetMappingsException)); - })); - }, validationException -> { - listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, - validationException)); - })); - } + if (dest.length > 0) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS, config.getDestination()), + RestStatus.BAD_REQUEST)); + return; + } - private static DataFrameTransform createDataFrameTransform(String transformId, ThreadPool threadPool) { - return new DataFrameTransform(transformId); + String[] src = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + config.getSource()); + if (src.length == 0) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, config.getSource()), + RestStatus.BAD_REQUEST)); + return; + } + + // Early check to verify that the user can create the destination index and can read from the source + if (licenseState.isAuthAllowed()) { + final String username = securityContext.getUser().principal(); + RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() + .indices(config.getSource()) + .privileges("read") + .build(); + RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() + .indices(config.getDestination()) + .privileges("read", "index", "create_index") + .build(); + + HasPrivilegesRequest privRequest = new HasPrivilegesRequest(); + privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); + privRequest.username(username); + privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); + privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges); + + ActionListener privResponseListener = ActionListener.wrap( + r -> handlePrivsResponse(username, config, r, listener), + listener::onFailure); + + client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); + } else { // No security enabled, just create the transform + putDataFrame(config, listener); + } } @Override protected ClusterBlockException checkBlock(PutDataFrameTransformAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } + + private void handlePrivsResponse(String username, + DataFrameTransformConfig config, + HasPrivilegesResponse privilegesResponse, + ActionListener listener) throws IOException { + if (privilegesResponse.isCompleteMatch()) { + putDataFrame(config, listener); + } else { + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + for (ResourcePrivileges index : privilegesResponse.getIndexPrivileges()) { + builder.field(index.getResource()); + builder.map(index.getPrivileges()); + } + builder.endObject(); + + listener.onFailure(Exceptions.authorizationError("Cannot create data frame transform [{}]" + + " because user {} lacks permissions on the indices: {}", + config.getId(), username, Strings.toString(builder))); + } + } + + private void putDataFrame(DataFrameTransformConfig config, ActionListener listener) { + + final Pivot pivot = new Pivot(config.getSource(), config.getQueryConfig().getQuery(), config.getPivotConfig()); + + + // <5> Return the listener, or clean up destination index on failure. + ActionListener putTransformConfigurationListener = ActionListener.wrap( + putTransformConfigurationResult -> listener.onResponse(new Response(true)), + putTransformConfigurationException -> + ClientHelper.executeAsyncWithOrigin(client, + ClientHelper.DATA_FRAME_ORIGIN, + DeleteIndexAction.INSTANCE, + new DeleteIndexRequest(config.getDestination()), ActionListener.wrap( + deleteIndexResponse -> listener.onFailure(putTransformConfigurationException), + deleteIndexException -> { + String msg = "Failed to delete destination index after creating transform [" + config.getId() + "] failed"; + listener.onFailure( + new ElasticsearchStatusException(msg, + RestStatus.INTERNAL_SERVER_ERROR, + putTransformConfigurationException)); + }) + ) + ); + + // <4> Put our transform + ActionListener createDestinationIndexListener = ActionListener.wrap( + createIndexResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener), + createDestinationIndexException -> listener.onFailure( + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX, + createDestinationIndexException)) + ); + + // <3> Create the destination index + ActionListener> deduceMappingsListener = ActionListener.wrap( + mappings -> DataframeIndex.createDestinationIndex(client, config, mappings, createDestinationIndexListener), + deduceTargetMappingsException -> listener.onFailure( + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS, + deduceTargetMappingsException)) + ); + + // <2> Deduce our mappings for the destination index + ActionListener pivotValidationListener = ActionListener.wrap( + validationResult -> pivot.deduceMappings(client, deduceMappingsListener), + validationException -> listener.onFailure( + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, + validationException)) + ); + + // <1> Validate our pivot + pivot.validate(client, pivotValidationListener); + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 91d1232245a..7fa19fa50e8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -6,106 +6,238 @@ package org.elasticsearch.xpack.dataframe.action; -import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; -import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; -import java.util.List; +import java.util.Collection; import java.util.function.Consumer; +import java.util.function.Predicate; public class TransportStartDataFrameTransformAction extends - TransportTasksAction { + TransportMasterNodeAction { private final XPackLicenseState licenseState; + private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; + private final PersistentTasksService persistentTasksService; + private final Client client; @Inject public TransportStartDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, - ClusterService clusterService, XPackLicenseState licenseState) { - super(StartDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, - StartDataFrameTransformAction.Request::new, StartDataFrameTransformAction.Response::new, - StartDataFrameTransformAction.Response::new, ThreadPool.Names.SAME); + ClusterService clusterService, XPackLicenseState licenseState, + ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, + DataFrameTransformsConfigManager dataFrameTransformsConfigManager, + PersistentTasksService persistentTasksService, Client client) { + super(StartDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, + StartDataFrameTransformAction.Request::new); this.licenseState = licenseState; + this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; + this.persistentTasksService = persistentTasksService; + this.client = client; } @Override - protected void processTasks(StartDataFrameTransformAction.Request request, Consumer operation) { - DataFrameTransformTask matchingTask = null; - - // todo: re-factor, see rollup TransportTaskHelper - for (Task task : taskManager.getTasks().values()) { - if (task instanceof DataFrameTransformTask - && ((DataFrameTransformTask) task).getTransformId().equals(request.getId())) { - if (matchingTask != null) { - throw new IllegalArgumentException("Found more than one matching task for data frame transform [" + request.getId() - + "] when " + "there should only be one."); - } - matchingTask = (DataFrameTransformTask) task; - } - } - - if (matchingTask != null) { - operation.accept(matchingTask); - } + protected String executor() { + return ThreadPool.Names.SAME; } @Override - protected void doExecute(Task task, StartDataFrameTransformAction.Request request, - ActionListener listener) { + protected StartDataFrameTransformAction.Response newResponse() { + return new StartDataFrameTransformAction.Response(); + } + @Override + protected void masterOperation(StartDataFrameTransformAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { if (!licenseState.isDataFrameAllowed()) { listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME)); return; } + final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool); - super.doExecute(task, request, listener); + // <3> Set the allocated task's state to STARTED + ActionListener> persistentTaskActionListener = ActionListener.wrap( + task -> { + waitForDataFrameTaskAllocated(task.getId(), + transformTask, + request.timeout(), + ActionListener.wrap( + taskAssigned -> ClientHelper.executeAsyncWithOrigin(client, + ClientHelper.DATA_FRAME_ORIGIN, + StartDataFrameTransformTaskAction.INSTANCE, + new StartDataFrameTransformTaskAction.Request(request.getId()), + ActionListener.wrap( + r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)), + startingFailure -> cancelDataFrameTask(task.getId(), + transformTask.getId(), + startingFailure, + listener::onFailure) + )), + listener::onFailure)); + }, + listener::onFailure + ); + + // <2> Create the task in cluster state so that it will start executing on the node + ActionListener getTransformListener = ActionListener.wrap( + config -> { + if (config.isValid() == false) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()), + RestStatus.BAD_REQUEST + )); + return; + } + PersistentTasksCustomMetaData.PersistentTask existingTask = + getExistingTask(transformTask.getId(), state); + if (existingTask == null) { + persistentTasksService.sendStartRequest(transformTask.getId(), + DataFrameTransform.NAME, + transformTask, + persistentTaskActionListener); + } else { + persistentTaskActionListener.onResponse(existingTask); + } + }, + listener::onFailure + ); + + // <1> Get the config to verify it exists and is valid + dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), getTransformListener); } @Override - protected void taskOperation(StartDataFrameTransformAction.Request request, DataFrameTransformTask transformTask, - ActionListener listener) { - if (transformTask.getTransformId().equals(request.getId())) { - transformTask.start(listener); + protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private static DataFrameTransform createDataFrameTransform(String transformId, ThreadPool threadPool) { + return new DataFrameTransform(transformId); + } + + @SuppressWarnings("unchecked") + private static PersistentTasksCustomMetaData.PersistentTask getExistingTask(String id, ClusterState state) { + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (pTasksMeta == null) { + return null; + } + Collection> existingTask = pTasksMeta.findTasks(DataFrameTransform.NAME, + t -> t.getId().equals(id)); + if (existingTask.isEmpty()) { + return null; } else { - listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() - + "] does not match request's ID [" + request.getId() + "]")); + assert(existingTask.size() == 1); + PersistentTasksCustomMetaData.PersistentTask pTask = existingTask.iterator().next(); + if (pTask.getParams() instanceof DataFrameTransform) { + return (PersistentTasksCustomMetaData.PersistentTask)pTask; + } + throw new ElasticsearchStatusException("Found data frame transform persistent task [" + id + "] with incorrect params", + RestStatus.INTERNAL_SERVER_ERROR); } } - @Override - protected StartDataFrameTransformAction.Response newResponse(StartDataFrameTransformAction.Request request, - List tasks, List taskOperationFailures, - List failedNodeExceptions) { + private void cancelDataFrameTask(String taskId, String dataFrameId, Exception exception, Consumer onFailure) { + persistentTasksService.sendRemoveRequest(taskId, + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + // We succeeded in cancelling the persistent task, but the + // problem that caused us to cancel it is the overall result + onFailure.accept(exception); + } - if (taskOperationFailures.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); - } else if (failedNodeExceptions.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); + @Override + public void onFailure(Exception e) { + logger.error("[" + dataFrameId + "] Failed to cancel persistent task that could " + + "not be assigned due to [" + exception.getMessage() + "]", e); + onFailure.accept(exception); + } + } + ); + } + + private void waitForDataFrameTaskAllocated(String taskId, + DataFrameTransform params, + TimeValue timeout, + ActionListener listener) { + DataFramePredicate predicate = new DataFramePredicate(); + persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, timeout, + new PersistentTasksService.WaitForPersistentTaskListener() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { + if (predicate.exception != null) { + // We want to return to the caller without leaving an unassigned persistent task + cancelDataFrameTask(taskId, params.getId(), predicate.exception, listener::onFailure); + } else { + listener.onResponse(true); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchException("Starting dataframe [" + + params.getId() + "] timed out after [" + timeout + "]")); + } + }); + } + + /** + * Important: the methods of this class must NOT throw exceptions. If they did then the callers + * of endpoints waiting for a condition tested by this predicate would never get a response. + */ + private class DataFramePredicate implements Predicate> { + + private volatile Exception exception; + + @Override + public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + if (persistentTask == null) { + return false; + } + PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); + if (assignment != null && + assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && + assignment.isAssigned() == false) { + // For some reason, the task is not assigned to a node, but is no longer in the `INITIAL_ASSIGNMENT` state + // Consider this a failure. + exception = new ElasticsearchStatusException("Could not start dataframe, allocation explanation [" + + assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); + return true; + } + // We just want it assigned so we can tell it to start working + return assignment != null && assignment.isAssigned(); } - - // Either the transform doesn't exist (the user didn't create it yet) or was deleted - // after the StartAPI executed. - // In either case, let the user know - if (tasks.size() == 0) { - throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found"); - } - - assert tasks.size() == 1; - - boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformAction.Response::isStarted); - return new StartDataFrameTransformAction.Response(allStarted); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java new file mode 100644 index 00000000000..9775acd2877 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; +import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; + +import java.util.List; +import java.util.function.Consumer; + +/** + * Internal only transport class to change an allocated persistent task's state to started + */ +public class TransportStartDataFrameTransformTaskAction extends + TransportTasksAction { + + private final XPackLicenseState licenseState; + + @Inject + public TransportStartDataFrameTransformTaskAction(TransportService transportService, ActionFilters actionFilters, + ClusterService clusterService, XPackLicenseState licenseState) { + super(StartDataFrameTransformTaskAction.NAME, clusterService, transportService, actionFilters, + StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new, + StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME); + this.licenseState = licenseState; + } + + @Override + protected void processTasks(StartDataFrameTransformTaskAction.Request request, Consumer operation) { + DataFrameTransformTask matchingTask = null; + + // todo: re-factor, see rollup TransportTaskHelper + for (Task task : taskManager.getTasks().values()) { + if (task instanceof DataFrameTransformTask + && ((DataFrameTransformTask) task).getTransformId().equals(request.getId())) { + if (matchingTask != null) { + throw new IllegalArgumentException("Found more than one matching task for data frame transform [" + request.getId() + + "] when " + "there should only be one."); + } + matchingTask = (DataFrameTransformTask) task; + } + } + + if (matchingTask != null) { + operation.accept(matchingTask); + } + } + + @Override + protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request request, + ActionListener listener) { + + if (!licenseState.isDataFrameAllowed()) { + listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME)); + return; + } + + super.doExecute(task, request, listener); + } + + @Override + protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask, + ActionListener listener) { + if (transformTask.getTransformId().equals(request.getId())) { + transformTask.start(listener); + } else { + listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() + + "] does not match request's ID [" + request.getId() + "]")); + } + } + + @Override + protected StartDataFrameTransformTaskAction.Response newResponse(StartDataFrameTransformTaskAction.Request request, + List tasks, + List taskOperationFailures, + List failedNodeExceptions) { + + if (taskOperationFailures.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); + } else if (failedNodeExceptions.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); + } + + // Either the transform doesn't exist (the user didn't create it yet) or was deleted + // after the StartAPI executed. + // In either case, let the user know + if (tasks.size() == 0) { + throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found"); + } + + assert tasks.size() == 1; + + boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted); + return new StartDataFrameTransformTaskAction.Response(allStarted); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index b045eee6462..78e8425758f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.dataframe.action; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; @@ -22,10 +20,12 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; import java.util.List; +import static org.elasticsearch.ExceptionsHelper.convertToElastic; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; public class TransportStopDataFrameTransformAction extends @@ -34,19 +34,26 @@ public class TransportStopDataFrameTransformAction extends private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); private final ThreadPool threadPool; + private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; @Inject public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, - ClusterService clusterService, ThreadPool threadPool) { + ClusterService clusterService, ThreadPool threadPool, + DataFrameTransformsConfigManager dataFrameTransformsConfigManager) { super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new, StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME); this.threadPool = threadPool; + this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; } @Override protected void doExecute(Task task, StopDataFrameTransformAction.Request request, ActionListener listener) { - super.doExecute(task, request, listener); + // Need to verify that the config actually exists + dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap( + config -> super.doExecute(task, request, listener), + listener::onFailure + )); } @Override @@ -101,16 +108,23 @@ public class TransportStopDataFrameTransformAction extends List failedNodeExceptions) { if (taskOperationFailures.isEmpty() == false) { - throw ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); + throw convertToElastic(taskOperationFailures.get(0).getCause()); } else if (failedNodeExceptions.isEmpty() == false) { - throw ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); + throw convertToElastic(failedNodeExceptions.get(0)); } // Either the transform doesn't exist (the user didn't create it yet) or was deleted // after the Stop API executed. // In either case, let the user know if (tasks.size() == 0) { - throw new ResourceNotFoundException("Task for Data Frame transform [" + request.getId() + "] not found"); + if (taskOperationFailures.isEmpty() == false) { + throw convertToElastic(taskOperationFailures.get(0).getCause()); + } else if (failedNodeExceptions.isEmpty() == false) { + throw convertToElastic(failedNodeExceptions.get(0)); + } else { + // This can happen we the actual task in the node no longer exists, or was never started + return new StopDataFrameTransformAction.Response(true); + } } assert tasks.size() == 1; @@ -118,4 +132,4 @@ public class TransportStopDataFrameTransformAction extends boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped); return new StopDataFrameTransformAction.Response(allStopped); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java index b2162377433..bbbe6defd69 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java @@ -20,8 +20,12 @@ import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -32,14 +36,21 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -47,6 +58,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class DataFrameTransformsConfigManager { private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class); + private static final int DEFAULT_SIZE = 100; public static final Map TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"); @@ -110,6 +122,53 @@ public class DataFrameTransformsConfigManager { })); } + /** + * Get more than one DataFrameTransformConfig + * + * @param transformId Can be a single transformId, `*`, or `_all` + * @param resultListener Listener to alert when request is completed + */ + // TODO add pagination support + public void getTransformConfigurations(String transformId, + ActionListener> resultListener) { + final boolean isAllOrWildCard = Strings.isAllOrWildcard(new String[]{transformId}); + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery("doc_type", DataFrameTransformConfig.NAME)); + if (isAllOrWildCard == false) { + queryBuilder.filter(QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId)); + } + + SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setTrackTotalHits(true) + .setSize(DEFAULT_SIZE) + .setQuery(queryBuilder) + .request(); + + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request, + ActionListener.wrap( + searchResponse -> { + List configs = new ArrayList<>(searchResponse.getHits().getHits().length); + for (SearchHit hit : searchResponse.getHits().getHits()) { + DataFrameTransformConfig config = parseTransformLenientlyFromSourceSync(hit.getSourceRef(), + resultListener::onFailure); + if (config == null) { + return; + } + configs.add(config); + } + if (configs.isEmpty() && (isAllOrWildCard == false)) { + resultListener.onFailure(new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); + return; + } + resultListener.onResponse(configs); + }, + resultListener::onFailure) + , client::search); + + } + public void deleteTransformConfiguration(String transformId, ActionListener listener) { DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -132,6 +191,18 @@ public class DataFrameTransformsConfigManager { })); } + private DataFrameTransformConfig parseTransformLenientlyFromSourceSync(BytesReference source, Consumer onFailure) { + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { + return DataFrameTransformConfig.fromXContent(parser, null, true); + } catch (Exception e) { + logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CONFIGURATION), e); + onFailure.accept(e); + return null; + } + } + private void parseTransformLenientlyFromSource(BytesReference source, String transformId, ActionListener transformListener) { try (InputStream stream = source.streamInput(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index ace968ee8a6..e4446c65abe 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -31,8 +31,8 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; -import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; +import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; @@ -141,7 +141,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S (task) -> { logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to [" + state.getIndexerState() + "][" + state.getPosition() + "]"); - listener.onResponse(new StartDataFrameTransformAction.Response(true)); + listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); }, (exc) -> { // We were unable to update the persistent status, so we need to shutdown the indexer too. indexer.stop(); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java index 19ff320e8f3..410573aa158 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java @@ -19,11 +19,15 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackFeatureSet.Usage; +import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.junit.Before; import java.io.IOException; @@ -80,7 +84,21 @@ public class DataFrameFeatureSetTests extends ESTestCase { transformsStateAndStats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats()); } + List transformConfigWithoutTasks = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(0, 10); ++i) { + transformConfigWithoutTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig()); + } + + List transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size()); + transformsStateAndStats.forEach(stats -> + transformConfigWithTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig(stats.getId()))); + + List allConfigs = new ArrayList<>(transformConfigWithoutTasks.size() + transformConfigWithTasks.size()); + allConfigs.addAll(transformConfigWithoutTasks); + allConfigs.addAll(transformConfigWithTasks); + GetDataFrameTransformsStatsAction.Response mockResponse = new GetDataFrameTransformsStatsAction.Response(transformsStateAndStats); + GetDataFrameTransformsAction.Response mockTransformsResponse = new GetDataFrameTransformsAction.Response(allConfigs); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -89,6 +107,14 @@ public class DataFrameFeatureSetTests extends ESTestCase { return Void.TYPE; }).when(client).execute(same(GetDataFrameTransformsStatsAction.INSTANCE), any(), any()); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(mockTransformsResponse); + return Void.TYPE; + }).when(client).execute(same(GetDataFrameTransformsAction.INSTANCE), any(), any()); + PlainActionFuture future = new PlainActionFuture<>(); featureSet.usage(future); XPackFeatureSet.Usage usage = future.get(); @@ -101,16 +127,18 @@ public class DataFrameFeatureSetTests extends ESTestCase { Map usageAsMap = parser.map(); assertTrue((boolean) XContentMapValues.extractValue("available", usageAsMap)); - if (transformsStateAndStats.isEmpty()) { + if (transformsStateAndStats.isEmpty() && transformConfigWithoutTasks.isEmpty()) { // no transforms, no stats assertEquals(null, XContentMapValues.extractValue("transforms", usageAsMap)); assertEquals(null, XContentMapValues.extractValue("stats", usageAsMap)); } else { - assertEquals(transformsStateAndStats.size(), XContentMapValues.extractValue("transforms._all", usageAsMap)); + assertEquals(transformsStateAndStats.size() + transformConfigWithoutTasks.size(), + XContentMapValues.extractValue("transforms._all", usageAsMap)); Map stateCounts = new HashMap<>(); transformsStateAndStats.stream().map(x -> x.getTransformState().getIndexerState().value()) .forEach(x -> stateCounts.merge(x, 1, Integer::sum)); + transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum)); stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap))); DataFrameIndexerTransformStats combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats()) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index 4cc58ed1e05..c8e7bc0a6ba 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -22,6 +22,13 @@ setup: - match: { count: 0 } - match: { transforms: [] } +--- +"Test get transform when it does not exist": + - do: + catch: /Transform with id \[missing-transform-id\] could not be found/ + data_frame.get_data_frame_transform: + transform_id: "missing-transform-id" + --- "Test delete transform when it does not exist": - do: @@ -32,7 +39,7 @@ setup: --- "Test put transform with invalid source index": - do: - catch: /Failed to validate data frame configuration/ + catch: /Source index \[missing-index\] does not exist/ data_frame.put_data_frame_transform: transform_id: "missing-source-transform" body: > @@ -98,13 +105,3 @@ setup: - match: { count: 2 } - match: { transforms.0.id: "airline-transform" } - match: { transforms.1.id: "airline-transform-dos" } - - - do: - data_frame.delete_data_frame_transform: - transform_id: "airline-transform" - - match: { acknowledged: true } - - - do: - data_frame.delete_data_frame_transform: - transform_id: "airline-transform-dos" - - match: { acknowledged: true } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 94281e2eb76..fd13e4dc6ea 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -63,6 +63,44 @@ teardown: data_frame.start_data_frame_transform: transform_id: "airline-transform-start-stop" +--- +"Test start/stop/start transform": + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { started: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "airline-transform-start-stop" + - match: { count: 1 } + - match: { transforms.0.id: "airline-transform-start-stop" } + - match: { transforms.0.state.transform_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { stopped: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "airline-transform-start-stop" + - match: { count: 1 } + - match: { transforms.0.id: "airline-transform-start-stop" } + - match: { transforms.0.state.transform_state: "stopped" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { started: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "airline-transform-start-stop" + - match: { count: 1 } + - match: { transforms.0.id: "airline-transform-start-stop" } + - match: { transforms.0.state.transform_state: "started" } + --- "Test stop missing transform": - do: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 69926fc6cfc..1c89058ed4a 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -25,9 +25,16 @@ setup: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-stats" --- teardown: + - do: + data_frame.stop_data_frame_transform: + transform_id: "airline-transform-stats" + - do: data_frame.delete_data_frame_transform: transform_id: "airline-transform-stats" @@ -39,7 +46,7 @@ teardown: transform_id: "airline-transform-stats" - match: { count: 1 } - match: { transforms.0.id: "airline-transform-stats" } - - match: { transforms.0.state.transform_state: "stopped" } + - match: { transforms.0.state.transform_state: "started" } - match: { transforms.0.state.generation: 0 } - match: { transforms.0.stats.pages_processed: 0 } - match: { transforms.0.stats.documents_processed: 0 } @@ -74,6 +81,9 @@ teardown: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-stats-dos" - do: data_frame.get_data_frame_transform_stats: transform_id: "*" @@ -88,6 +98,9 @@ teardown: - match: { transforms.0.id: "airline-transform-stats" } - match: { transforms.1.id: "airline-transform-stats-dos" } + - do: + data_frame.stop_data_frame_transform: + transform_id: "airline-transform-stats-dos" - do: data_frame.delete_data_frame_transform: transform_id: "airline-transform-stats-dos"