[Data Frame] Refactor PUT transform to not create a task (#39934) (#40010)

* [Data Frame] Refactor PUT transform such that:

 * POST _start creates the task and starts it
 * GET transforms queries docs instead of tasks
 * POST _stop verifies the stored config exists before trying to stop
the task

* Addressing PR comments

* Refactoring DataFrameFeatureSet#usage, decreasing size returned getTransformConfigurations

* fixing failing usage test
This commit is contained in:
Benjamin Trent 2019-03-13 17:08:15 -05:00 committed by GitHub
parent a3ef565a25
commit 8c6ff5de31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 881 additions and 269 deletions

View File

@ -20,16 +20,17 @@ public class DataFrameMessages {
public static final String REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION =
"Failed to validate data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS = "Failed to deduce target mappings";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK =
"Failed to start persistent task, configuration has been cleaned up: [{0}]";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index";
public static final String REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS = "dest index [{0}] already exists";
public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";
public static final String DATA_FRAME_CONFIG_INVALID = "Data frame transform configuration is invalid [{0}]";
public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
"Failed to load data frame transform configuration for transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION =

View File

@ -8,12 +8,10 @@ package org.elasticsearch.xpack.core.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseField;
@ -25,7 +23,6 @@ import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@ -52,7 +49,7 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
return new Response();
}
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
public static class Request extends ActionRequest implements ToXContent {
private String id;
public Request(String id) {
@ -63,23 +60,14 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
}
}
private Request() {}
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
}
@Override
public boolean match(Task task) {
// If we are retrieving all the transforms, the task description does not contain the id
if (id.equals(MetaData.ALL)) {
return task.getDescription().startsWith(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX);
}
// Otherwise find the task by ID
return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}
public String getId() {
return id;
}
@ -126,7 +114,7 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
}
}
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
public static class Response extends ActionResponse implements Writeable, ToXContentObject {
public static final String INVALID_TRANSFORMS_DEPRECATION_WARNING = "Found [{}] invalid transforms";
private static final ParseField INVALID_TRANSFORMS = new ParseField("invalid_transforms");
@ -134,22 +122,14 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
private List<DataFrameTransformConfig> transformConfigurations;
public Response(List<DataFrameTransformConfig> transformConfigs) {
super(Collections.emptyList(), Collections.emptyList());
this.transformConfigurations = transformConfigs;
}
public Response(List<DataFrameTransformConfig> transformConfigs, List<TaskOperationFailure> taskFailures,
List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.transformConfigurations = transformConfigs;
}
public Response() {
super(Collections.emptyList(), Collections.emptyList());
this.transformConfigurations = Collections.emptyList();
}
public Response(StreamInput in) throws IOException {
super(in);
readFrom(in);
}
@ -173,7 +153,6 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
List<String> invalidTransforms = new ArrayList<>();
builder.startObject();
toXContentCommon(builder, params);
builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size());
// XContentBuilder does not support passing the params object for Iterables
builder.field(DataFrameField.TRANSFORMS.getPreferredName());

View File

@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
@ -39,14 +39,15 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
return new Response();
}
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
private String id;
public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
}
private Request() {
public Request() {
}
public Request(StreamInput in) throws IOException {

View File

@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
public class StartDataFrameTransformTaskAction extends Action<StartDataFrameTransformTaskAction.Response> {
public static final StartDataFrameTransformTaskAction INSTANCE = new StartDataFrameTransformTaskAction();
public static final String NAME = "cluster:admin/data_frame/start_task";
private StartDataFrameTransformTaskAction() {
super(NAME);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
private String id;
public Request(String id) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
}
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
}
public String getId() {
return id;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(DataFrameField.ID.getPreferredName(), id);
return builder;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
protected RequestBuilder(ElasticsearchClient client, StartDataFrameTransformTaskAction action) {
super(client, action, new Request());
}
}
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean started;
public Response() {
super(Collections.emptyList(), Collections.emptyList());
}
public Response(StreamInput in) throws IOException {
super(in);
readFrom(in);
}
public Response(boolean started) {
super(Collections.emptyList(), Collections.emptyList());
this.started = started;
}
public boolean isStarted() {
return started;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
started = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(started);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentCommon(builder, params);
builder.field("started", started);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Response response = (Response) obj;
return started == response.started;
}
@Override
public int hashCode() {
return Objects.hash(started);
}
}
}

View File

@ -36,7 +36,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
*/
public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransformConfig> implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transform_config";
public static final String NAME = "data_frame_transform_config";
public static final ParseField HEADERS = new ParseField("headers");
public static final ParseField QUERY = new ParseField("query");

View File

@ -218,8 +218,11 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
protected static String getDataFrameIndexerState(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
if (transforms.isEmpty()) {
return null;
}
Map<?, ?> transformStatsAsMap = (Map<?, ?>) transforms.get(0);
return (String) XContentMapValues.extractValue("state.transform_state", transformStatsAsMap);
}
@ -234,7 +237,6 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
protected static void wipeDataFrameTransforms() throws IOException, InterruptedException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
Request request = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_stop");
@ -242,7 +244,10 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
request.addParameter("timeout", "10s");
request.addParameter("ignore", "404");
adminClient().performRequest(request);
assertEquals("stopped", getDataFrameIndexerState(transformId));
String state = getDataFrameIndexerState(transformId);
if (state != null) {
assertEquals("stopped", getDataFrameIndexerState(transformId));
}
}
for (Map<String, Object> transformConfig : transformConfigs) {

View File

@ -35,7 +35,7 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
indicesCreated = true;
}
public void testUsage() throws IOException {
public void testUsage() throws Exception {
Response usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
Map<?, ?> usageAsMap = entityAsMap(usageResponse);
@ -47,12 +47,20 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
// create a transform
createPivotReviewsTransform("test_usage", "pivot_reviews", null);
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
// TODO remove as soon as stats are stored in an index instead of ClusterState with the task
startAndWaitForTransform("test_usage", "pivot_reviews");
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
// we should see some stats
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
assertEquals(0, XContentMapValues.extractValue("data_frame.stats.index_failures", usageAsMap));
}
}

View File

@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
@ -54,6 +55,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsS
import org.elasticsearch.xpack.dataframe.action.TransportPreviewDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
@ -148,6 +150,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
return Arrays.asList(
new ActionHandler<>(PutDataFrameTransformAction.INSTANCE, TransportPutDataFrameTransformAction.class),
new ActionHandler<>(StartDataFrameTransformAction.INSTANCE, TransportStartDataFrameTransformAction.class),
new ActionHandler<>(StartDataFrameTransformTaskAction.INSTANCE, TransportStartDataFrameTransformTaskAction.class),
new ActionHandler<>(StopDataFrameTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
new ActionHandler<>(DeleteDataFrameTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),

View File

@ -17,13 +17,18 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class DataFrameFeatureSet implements XPackFeatureSet {
@ -71,18 +76,33 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
return;
}
GetDataFrameTransformsStatsAction.Request transformStatsRequest = new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap(
transforms -> {
Set<String> transformIds = transforms.getTransformConfigurations()
.stream()
.map(DataFrameTransformConfig::getId)
.collect(Collectors.toSet());
GetDataFrameTransformsStatsAction.Request transformStatsRequest =
new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsStatsAction.INSTANCE,
transformStatsRequest,
ActionListener.wrap(transformStatsResponse -> {
Map<String, Long> transformsCountByState = new HashMap<>();
DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();
client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest, ActionListener.wrap(transformStatsResponse -> {
Map<String, Long> transformsCountByState = new HashMap<>();
DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();
transformStatsResponse.getTransformsStateAndStats().forEach(singleResult -> {
transformIds.remove(singleResult.getId());
transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
accumulatedStats.merge(singleResult.getTransformStats());
});
// If there is no state returned, assumed stopped
transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum));
transformStatsResponse.getTransformsStateAndStats().stream().forEach(singleResult -> {
transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
accumulatedStats.merge(singleResult.getTransformStats());
});
listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats));
}, listener::onFailure));
listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats));
}, listener::onFailure));
},
listener::onFailure
));
}
}

View File

@ -7,98 +7,34 @@
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response;
import org.elasticsearch.xpack.dataframe.persistence.DataFramePersistentTaskUtils;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
public class TransportGetDataFrameTransformsAction extends
TransportTasksAction<DataFrameTransformTask,
GetDataFrameTransformsAction.Request,
GetDataFrameTransformsAction.Response,
GetDataFrameTransformsAction.Response> {
public class TransportGetDataFrameTransformsAction extends HandledTransportAction<Request, Response> {
private final DataFrameTransformsConfigManager transformsConfigManager;
@Inject
public TransportGetDataFrameTransformsAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, DataFrameTransformsConfigManager transformsConfigManager) {
super(GetDataFrameTransformsAction.NAME, clusterService, transportService, actionFilters, GetDataFrameTransformsAction.Request::new,
GetDataFrameTransformsAction.Response::new, GetDataFrameTransformsAction.Response::new, ThreadPool.Names.SAME);
DataFrameTransformsConfigManager transformsConfigManager) {
super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, () -> new Request());
this.transformsConfigManager = transformsConfigManager;
}
@Override
protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
List<DataFrameTransformConfig> configs = tasks.stream()
.flatMap(r -> r.getTransformConfigurations().stream())
.sorted(Comparator.comparing(DataFrameTransformConfig::getId))
.collect(Collectors.toList());
return new Response(configs, taskOperationFailures, failedNodeExceptions);
}
@Override
protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener<Response> listener) {
assert task.getTransformId().equals(request.getId()) || request.getId().equals(MetaData.ALL);
// Little extra insurance, make sure we only return transforms that aren't cancelled
if (task.isCancelled() == false) {
transformsConfigManager.getTransformConfiguration(task.getTransformId(), ActionListener.wrap(config -> {
listener.onResponse(new Response(Collections.singletonList(config)));
}, e -> {
listener.onFailure(new RuntimeException("failed to retrieve...", e));
}));
} else {
listener.onResponse(new Response(Collections.emptyList()));
}
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster()) {
if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) {
super.doExecute(task, request, listener);
} else {
// If we couldn't find the transform in the persistent task CS, it means it was deleted prior to this GET
// and we can just send an empty response, no need to go looking for the allocated task
listener.onResponse(new Response(Collections.emptyList()));
}
} else {
// Delegates GetTransforms to elected master node, so it becomes the coordinating node.
// Non-master nodes may have a stale cluster state that shows transforms which are cancelled
// on the master, which makes testing difficult.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
}
}
//TODO support comma delimited and simple regex IDs
transformsConfigManager.getTransformConfigurations(request.getId(), ActionListener.wrap(
configs -> listener.onResponse(new Response(configs)),
listener::onFailure
));
}
}

View File

@ -73,6 +73,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
}
@Override
// TODO gather stats from docs when moved out of allocated task
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();

View File

@ -8,9 +8,13 @@ package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
@ -18,27 +22,39 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
@ -48,21 +64,23 @@ public class TransportPutDataFrameTransformAction
private static final Logger logger = LogManager.getLogger(TransportPutDataFrameTransformAction.class);
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final SecurityContext securityContext;
@Inject
public TransportPutDataFrameTransformAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, XPackLicenseState licenseState,
PersistentTasksService persistentTasksService, DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
Client client) {
public TransportPutDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, XPackLicenseState licenseState,
PersistentTasksService persistentTasksService,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) {
super(PutDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
PutDataFrameTransformAction.Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
new SecurityContext(settings, threadPool.getThreadContext()) : null;
}
@Override
@ -101,62 +119,130 @@ public class TransportPutDataFrameTransformAction
return;
}
// create the transform, for now we only have pivot and no support for custom queries
Pivot pivot = new Pivot(config.getSource(), new MatchAllQueryBuilder(), config.getPivotConfig());
String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
config.getDestination());
// the non-state creating steps are done first, so we minimize the chance to end up with orphaned state transform validation
pivot.validate(client, ActionListener.wrap(validationResult -> {
// deduce target mappings
pivot.deduceMappings(client, ActionListener.wrap(mappings -> {
// create the destination index
DataframeIndex.createDestinationIndex(client, config, mappings, ActionListener.wrap(createIndexResult -> {
DataFrameTransform transform = createDataFrameTransform(transformId, threadPool);
// create the transform configuration and store it in the internal index
dataFrameTransformsConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {
// finally start the persistent task
persistentTasksService.sendStartRequest(transform.getId(), DataFrameTransform.NAME, transform,
ActionListener.wrap(persistentTask -> {
listener.onResponse(new PutDataFrameTransformAction.Response(true));
}, startPersistentTaskException -> {
// delete the otherwise orphaned transform configuration, for now we do not delete the destination index
dataFrameTransformsConfigManager.deleteTransformConfiguration(transformId, ActionListener.wrap(r2 -> {
logger.debug("Deleted data frame transform [{}] configuration from data frame configuration index",
transformId);
listener.onFailure(
new RuntimeException(
DataFrameMessages.getMessage(
DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK, r2),
startPersistentTaskException));
}, deleteTransformFromIndexException -> {
logger.error("Failed to cleanup orphaned data frame transform [{}] configuration", transformId);
listener.onFailure(
new RuntimeException(
DataFrameMessages.getMessage(
DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK, false),
startPersistentTaskException));
}));
}));
}, listener::onFailure));
}, createDestinationIndexException -> {
listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX,
createDestinationIndexException));
}));
}, deduceTargetMappingsException -> {
listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS,
deduceTargetMappingsException));
}));
}, validationException -> {
listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
validationException));
}));
}
if (dest.length > 0) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS, config.getDestination()),
RestStatus.BAD_REQUEST));
return;
}
private static DataFrameTransform createDataFrameTransform(String transformId, ThreadPool threadPool) {
return new DataFrameTransform(transformId);
String[] src = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
config.getSource());
if (src.length == 0) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, config.getSource()),
RestStatus.BAD_REQUEST));
return;
}
// Early check to verify that the user can create the destination index and can read from the source
if (licenseState.isAuthAllowed()) {
final String username = securityContext.getUser().principal();
RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
.indices(config.getSource())
.privileges("read")
.build();
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
.indices(config.getDestination())
.privileges("read", "index", "create_index")
.build();
HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
privRequest.username(username);
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, config, r, listener),
listener::onFailure);
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else { // No security enabled, just create the transform
putDataFrame(config, listener);
}
}
@Override
protected ClusterBlockException checkBlock(PutDataFrameTransformAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
private void handlePrivsResponse(String username,
DataFrameTransformConfig config,
HasPrivilegesResponse privilegesResponse,
ActionListener<Response> listener) throws IOException {
if (privilegesResponse.isCompleteMatch()) {
putDataFrame(config, listener);
} else {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
for (ResourcePrivileges index : privilegesResponse.getIndexPrivileges()) {
builder.field(index.getResource());
builder.map(index.getPrivileges());
}
builder.endObject();
listener.onFailure(Exceptions.authorizationError("Cannot create data frame transform [{}]" +
" because user {} lacks permissions on the indices: {}",
config.getId(), username, Strings.toString(builder)));
}
}
private void putDataFrame(DataFrameTransformConfig config, ActionListener<Response> listener) {
final Pivot pivot = new Pivot(config.getSource(), config.getQueryConfig().getQuery(), config.getPivotConfig());
// <5> Return the listener, or clean up destination index on failure.
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
putTransformConfigurationResult -> listener.onResponse(new Response(true)),
putTransformConfigurationException ->
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
DeleteIndexAction.INSTANCE,
new DeleteIndexRequest(config.getDestination()), ActionListener.wrap(
deleteIndexResponse -> listener.onFailure(putTransformConfigurationException),
deleteIndexException -> {
String msg = "Failed to delete destination index after creating transform [" + config.getId() + "] failed";
listener.onFailure(
new ElasticsearchStatusException(msg,
RestStatus.INTERNAL_SERVER_ERROR,
putTransformConfigurationException));
})
)
);
// <4> Put our transform
ActionListener<Boolean> createDestinationIndexListener = ActionListener.wrap(
createIndexResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
createDestinationIndexException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX,
createDestinationIndexException))
);
// <3> Create the destination index
ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
mappings -> DataframeIndex.createDestinationIndex(client, config, mappings, createDestinationIndexListener),
deduceTargetMappingsException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS,
deduceTargetMappingsException))
);
// <2> Deduce our mappings for the destination index
ActionListener<Boolean> pivotValidationListener = ActionListener.wrap(
validationResult -> pivot.deduceMappings(client, deduceMappingsListener),
validationException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
validationException))
);
// <1> Validate our pivot
pivot.validate(client, pivotValidationListener);
}
}

View File

@ -6,106 +6,238 @@
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.util.List;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class TransportStartDataFrameTransformAction extends
TransportTasksAction<DataFrameTransformTask, StartDataFrameTransformAction.Request,
StartDataFrameTransformAction.Response, StartDataFrameTransformAction.Response> {
TransportMasterNodeAction<StartDataFrameTransformAction.Request, StartDataFrameTransformAction.Response> {
private final XPackLicenseState licenseState;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final PersistentTasksService persistentTasksService;
private final Client client;
@Inject
public TransportStartDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, XPackLicenseState licenseState) {
super(StartDataFrameTransformAction.NAME, clusterService, transportService, actionFilters,
StartDataFrameTransformAction.Request::new, StartDataFrameTransformAction.Response::new,
StartDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
ClusterService clusterService, XPackLicenseState licenseState,
ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
PersistentTasksService persistentTasksService, Client client) {
super(StartDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
StartDataFrameTransformAction.Request::new);
this.licenseState = licenseState;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.persistentTasksService = persistentTasksService;
this.client = client;
}
@Override
protected void processTasks(StartDataFrameTransformAction.Request request, Consumer<DataFrameTransformTask> operation) {
DataFrameTransformTask matchingTask = null;
// todo: re-factor, see rollup TransportTaskHelper
for (Task task : taskManager.getTasks().values()) {
if (task instanceof DataFrameTransformTask
&& ((DataFrameTransformTask) task).getTransformId().equals(request.getId())) {
if (matchingTask != null) {
throw new IllegalArgumentException("Found more than one matching task for data frame transform [" + request.getId()
+ "] when " + "there should only be one.");
}
matchingTask = (DataFrameTransformTask) task;
}
}
if (matchingTask != null) {
operation.accept(matchingTask);
}
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected void doExecute(Task task, StartDataFrameTransformAction.Request request,
ActionListener<StartDataFrameTransformAction.Response> listener) {
protected StartDataFrameTransformAction.Response newResponse() {
return new StartDataFrameTransformAction.Response();
}
@Override
protected void masterOperation(StartDataFrameTransformAction.Request request,
ClusterState state,
ActionListener<StartDataFrameTransformAction.Response> listener) throws Exception {
if (!licenseState.isDataFrameAllowed()) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
return;
}
final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool);
super.doExecute(task, request, listener);
// <3> Set the allocated task's state to STARTED
ActionListener<PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>> persistentTaskActionListener = ActionListener.wrap(
task -> {
waitForDataFrameTaskAllocated(task.getId(),
transformTask,
request.timeout(),
ActionListener.wrap(
taskAssigned -> ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
StartDataFrameTransformTaskAction.INSTANCE,
new StartDataFrameTransformTaskAction.Request(request.getId()),
ActionListener.wrap(
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
startingFailure -> cancelDataFrameTask(task.getId(),
transformTask.getId(),
startingFailure,
listener::onFailure)
)),
listener::onFailure));
},
listener::onFailure
);
// <2> Create the task in cluster state so that it will start executing on the node
ActionListener<DataFrameTransformConfig> getTransformListener = ActionListener.wrap(
config -> {
if (config.isValid() == false) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()),
RestStatus.BAD_REQUEST
));
return;
}
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> existingTask =
getExistingTask(transformTask.getId(), state);
if (existingTask == null) {
persistentTasksService.sendStartRequest(transformTask.getId(),
DataFrameTransform.NAME,
transformTask,
persistentTaskActionListener);
} else {
persistentTaskActionListener.onResponse(existingTask);
}
},
listener::onFailure
);
// <1> Get the config to verify it exists and is valid
dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
}
@Override
protected void taskOperation(StartDataFrameTransformAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(listener);
protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
private static DataFrameTransform createDataFrameTransform(String transformId, ThreadPool threadPool) {
return new DataFrameTransform(transformId);
}
@SuppressWarnings("unchecked")
private static PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> getExistingTask(String id, ClusterState state) {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta == null) {
return null;
}
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> existingTask = pTasksMeta.findTasks(DataFrameTransform.NAME,
t -> t.getId().equals(id));
if (existingTask.isEmpty()) {
return null;
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));
assert(existingTask.size() == 1);
PersistentTasksCustomMetaData.PersistentTask<?> pTask = existingTask.iterator().next();
if (pTask.getParams() instanceof DataFrameTransform) {
return (PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>)pTask;
}
throw new ElasticsearchStatusException("Found data frame transform persistent task [" + id + "] with incorrect params",
RestStatus.INTERNAL_SERVER_ERROR);
}
}
@Override
protected StartDataFrameTransformAction.Response newResponse(StartDataFrameTransformAction.Request request,
List<StartDataFrameTransformAction.Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
private void cancelDataFrameTask(String taskId, String dataFrameId, Exception exception, Consumer<Exception> onFailure) {
persistentTasksService.sendRemoveRequest(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
// We succeeded in cancelling the persistent task, but the
// problem that caused us to cancel it is the overall result
onFailure.accept(exception);
}
if (taskOperationFailures.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
@Override
public void onFailure(Exception e) {
logger.error("[" + dataFrameId + "] Failed to cancel persistent task that could " +
"not be assigned due to [" + exception.getMessage() + "]", e);
onFailure.accept(exception);
}
}
);
}
private void waitForDataFrameTaskAllocated(String taskId,
DataFrameTransform params,
TimeValue timeout,
ActionListener<Boolean> listener) {
DataFramePredicate predicate = new DataFramePredicate();
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, timeout,
new PersistentTasksService.WaitForPersistentTaskListener<DataFrameTransform>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>
persistentTask) {
if (predicate.exception != null) {
// We want to return to the caller without leaving an unassigned persistent task
cancelDataFrameTask(taskId, params.getId(), predicate.exception, listener::onFailure);
} else {
listener.onResponse(true);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new ElasticsearchException("Starting dataframe ["
+ params.getId() + "] timed out after [" + timeout + "]"));
}
});
}
/**
* Important: the methods of this class must NOT throw exceptions. If they did then the callers
* of endpoints waiting for a condition tested by this predicate would never get a response.
*/
private class DataFramePredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
private volatile Exception exception;
@Override
public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
if (persistentTask == null) {
return false;
}
PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
if (assignment != null &&
assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
assignment.isAssigned() == false) {
// For some reason, the task is not assigned to a node, but is no longer in the `INITIAL_ASSIGNMENT` state
// Consider this a failure.
exception = new ElasticsearchStatusException("Could not start dataframe, allocation explanation [" +
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
return true;
}
// We just want it assigned so we can tell it to start working
return assignment != null && assignment.isAssigned();
}
// Either the transform doesn't exist (the user didn't create it yet) or was deleted
// after the StartAPI executed.
// In either case, let the user know
if (tasks.size() == 0) {
throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found");
}
assert tasks.size() == 1;
boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformAction.Response::isStarted);
return new StartDataFrameTransformAction.Response(allStarted);
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.List;
import java.util.function.Consumer;
/**
* Internal only transport class to change an allocated persistent task's state to started
*/
public class TransportStartDataFrameTransformTaskAction extends
TransportTasksAction<DataFrameTransformTask, StartDataFrameTransformTaskAction.Request,
StartDataFrameTransformTaskAction.Response, StartDataFrameTransformTaskAction.Response> {
private final XPackLicenseState licenseState;
@Inject
public TransportStartDataFrameTransformTaskAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, XPackLicenseState licenseState) {
super(StartDataFrameTransformTaskAction.NAME, clusterService, transportService, actionFilters,
StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new,
StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME);
this.licenseState = licenseState;
}
@Override
protected void processTasks(StartDataFrameTransformTaskAction.Request request, Consumer<DataFrameTransformTask> operation) {
DataFrameTransformTask matchingTask = null;
// todo: re-factor, see rollup TransportTaskHelper
for (Task task : taskManager.getTasks().values()) {
if (task instanceof DataFrameTransformTask
&& ((DataFrameTransformTask) task).getTransformId().equals(request.getId())) {
if (matchingTask != null) {
throw new IllegalArgumentException("Found more than one matching task for data frame transform [" + request.getId()
+ "] when " + "there should only be one.");
}
matchingTask = (DataFrameTransformTask) task;
}
}
if (matchingTask != null) {
operation.accept(matchingTask);
}
}
@Override
protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request request,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (!licenseState.isDataFrameAllowed()) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
return;
}
super.doExecute(task, request, listener);
}
@Override
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));
}
}
@Override
protected StartDataFrameTransformTaskAction.Response newResponse(StartDataFrameTransformTaskAction.Request request,
List<StartDataFrameTransformTaskAction.Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
if (taskOperationFailures.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
}
// Either the transform doesn't exist (the user didn't create it yet) or was deleted
// after the StartAPI executed.
// In either case, let the user know
if (tasks.size() == 0) {
throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found");
}
assert tasks.size() == 1;
boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted);
return new StartDataFrameTransformTaskAction.Response(allStarted);
}
}

View File

@ -7,8 +7,6 @@ package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
@ -22,10 +20,12 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.List;
import static org.elasticsearch.ExceptionsHelper.convertToElastic;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
public class TransportStopDataFrameTransformAction extends
@ -34,19 +34,26 @@ public class TransportStopDataFrameTransformAction extends
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private final ThreadPool threadPool;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
@Inject
public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, ThreadPool threadPool) {
ClusterService clusterService, ThreadPool threadPool,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new,
StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
}
@Override
protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
super.doExecute(task, request, listener);
// Need to verify that the config actually exists
dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap(
config -> super.doExecute(task, request, listener),
listener::onFailure
));
}
@Override
@ -101,16 +108,23 @@ public class TransportStopDataFrameTransformAction extends
List<FailedNodeException> failedNodeExceptions) {
if (taskOperationFailures.isEmpty() == false) {
throw ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
throw convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
throw convertToElastic(failedNodeExceptions.get(0));
}
// Either the transform doesn't exist (the user didn't create it yet) or was deleted
// after the Stop API executed.
// In either case, let the user know
if (tasks.size() == 0) {
throw new ResourceNotFoundException("Task for Data Frame transform [" + request.getId() + "] not found");
if (taskOperationFailures.isEmpty() == false) {
throw convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw convertToElastic(failedNodeExceptions.get(0));
} else {
// This can happen we the actual task in the node no longer exists, or was never started
return new StopDataFrameTransformAction.Response(true);
}
}
assert tasks.size() == 1;

View File

@ -20,8 +20,12 @@ import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -32,14 +36,21 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@ -47,6 +58,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class DataFrameTransformsConfigManager {
private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class);
private static final int DEFAULT_SIZE = 100;
public static final Map<String, String> TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true");
@ -110,6 +122,53 @@ public class DataFrameTransformsConfigManager {
}));
}
/**
* Get more than one DataFrameTransformConfig
*
* @param transformId Can be a single transformId, `*`, or `_all`
* @param resultListener Listener to alert when request is completed
*/
// TODO add pagination support
public void getTransformConfigurations(String transformId,
ActionListener<List<DataFrameTransformConfig>> resultListener) {
final boolean isAllOrWildCard = Strings.isAllOrWildcard(new String[]{transformId});
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("doc_type", DataFrameTransformConfig.NAME));
if (isAllOrWildCard == false) {
queryBuilder.filter(QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId));
}
SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.setTrackTotalHits(true)
.setSize(DEFAULT_SIZE)
.setQuery(queryBuilder)
.request();
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
List<DataFrameTransformConfig> configs = new ArrayList<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
DataFrameTransformConfig config = parseTransformLenientlyFromSourceSync(hit.getSourceRef(),
resultListener::onFailure);
if (config == null) {
return;
}
configs.add(config);
}
if (configs.isEmpty() && (isAllOrWildCard == false)) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
return;
}
resultListener.onResponse(configs);
},
resultListener::onFailure)
, client::search);
}
public void deleteTransformConfiguration(String transformId, ActionListener<Boolean> listener) {
DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@ -132,6 +191,18 @@ public class DataFrameTransformsConfigManager {
}));
}
private DataFrameTransformConfig parseTransformLenientlyFromSourceSync(BytesReference source, Consumer<Exception> onFailure) {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
return DataFrameTransformConfig.fromXContent(parser, null, true);
} catch (Exception e) {
logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CONFIGURATION), e);
onFailure.accept(e);
return null;
}
}
private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
ActionListener<DataFrameTransformConfig> transformListener) {
try (InputStream stream = source.streamInput();

View File

@ -31,8 +31,8 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
@ -141,7 +141,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
(task) -> {
logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to ["
+ state.getIndexerState() + "][" + state.getPosition() + "]");
listener.onResponse(new StartDataFrameTransformAction.Response(true));
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
}, (exc) -> {
// We were unable to update the persistent status, so we need to shutdown the indexer too.
indexer.stop();

View File

@ -19,11 +19,15 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.junit.Before;
import java.io.IOException;
@ -80,7 +84,21 @@ public class DataFrameFeatureSetTests extends ESTestCase {
transformsStateAndStats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
}
List<DataFrameTransformConfig> transformConfigWithoutTasks = new ArrayList<>();
for (int i = 0; i < randomIntBetween(0, 10); ++i) {
transformConfigWithoutTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}
List<DataFrameTransformConfig> transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size());
transformsStateAndStats.forEach(stats ->
transformConfigWithTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig(stats.getId())));
List<DataFrameTransformConfig> allConfigs = new ArrayList<>(transformConfigWithoutTasks.size() + transformConfigWithTasks.size());
allConfigs.addAll(transformConfigWithoutTasks);
allConfigs.addAll(transformConfigWithTasks);
GetDataFrameTransformsStatsAction.Response mockResponse = new GetDataFrameTransformsStatsAction.Response(transformsStateAndStats);
GetDataFrameTransformsAction.Response mockTransformsResponse = new GetDataFrameTransformsAction.Response(allConfigs);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
@ -89,6 +107,14 @@ public class DataFrameFeatureSetTests extends ESTestCase {
return Void.TYPE;
}).when(client).execute(same(GetDataFrameTransformsStatsAction.INSTANCE), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<GetDataFrameTransformsAction.Response> listener =
(ActionListener<GetDataFrameTransformsAction.Response>) invocationOnMock.getArguments()[2];
listener.onResponse(mockTransformsResponse);
return Void.TYPE;
}).when(client).execute(same(GetDataFrameTransformsAction.INSTANCE), any(), any());
PlainActionFuture<Usage> future = new PlainActionFuture<>();
featureSet.usage(future);
XPackFeatureSet.Usage usage = future.get();
@ -101,16 +127,18 @@ public class DataFrameFeatureSetTests extends ESTestCase {
Map<String, Object> usageAsMap = parser.map();
assertTrue((boolean) XContentMapValues.extractValue("available", usageAsMap));
if (transformsStateAndStats.isEmpty()) {
if (transformsStateAndStats.isEmpty() && transformConfigWithoutTasks.isEmpty()) {
// no transforms, no stats
assertEquals(null, XContentMapValues.extractValue("transforms", usageAsMap));
assertEquals(null, XContentMapValues.extractValue("stats", usageAsMap));
} else {
assertEquals(transformsStateAndStats.size(), XContentMapValues.extractValue("transforms._all", usageAsMap));
assertEquals(transformsStateAndStats.size() + transformConfigWithoutTasks.size(),
XContentMapValues.extractValue("transforms._all", usageAsMap));
Map<String, Integer> stateCounts = new HashMap<>();
transformsStateAndStats.stream().map(x -> x.getTransformState().getIndexerState().value())
.forEach(x -> stateCounts.merge(x, 1, Integer::sum));
transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum));
stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap)));
DataFrameIndexerTransformStats combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats())

View File

@ -22,6 +22,13 @@ setup:
- match: { count: 0 }
- match: { transforms: [] }
---
"Test get transform when it does not exist":
- do:
catch: /Transform with id \[missing-transform-id\] could not be found/
data_frame.get_data_frame_transform:
transform_id: "missing-transform-id"
---
"Test delete transform when it does not exist":
- do:
@ -32,7 +39,7 @@ setup:
---
"Test put transform with invalid source index":
- do:
catch: /Failed to validate data frame configuration/
catch: /Source index \[missing-index\] does not exist/
data_frame.put_data_frame_transform:
transform_id: "missing-source-transform"
body: >
@ -98,13 +105,3 @@ setup:
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform" }
- match: { transforms.1.id: "airline-transform-dos" }
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform"
- match: { acknowledged: true }
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-dos"
- match: { acknowledged: true }

View File

@ -63,6 +63,44 @@ teardown:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
---
"Test start/stop/start transform":
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { started: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.transform_state: "started" }
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { stopped: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.transform_state: "stopped" }
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { started: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-stop"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-stop" }
- match: { transforms.0.state.transform_state: "started" }
---
"Test stop missing transform":
- do:

View File

@ -25,9 +25,16 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stats"
---
teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-stats"
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stats"
@ -39,7 +46,7 @@ teardown:
transform_id: "airline-transform-stats"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.transform_state: "stopped" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.0.state.generation: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
@ -74,6 +81,9 @@ teardown:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stats-dos"
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
@ -88,6 +98,9 @@ teardown:
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-stats-dos"
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stats-dos"