[ML DataFrame] Data Frame stop all (#41156)

Wild card support for the data frame stop API
This commit is contained in:
David Kyle 2019-04-15 14:33:16 +01:00
parent 3f00c29adb
commit 2b539f8347
7 changed files with 218 additions and 16 deletions

View File

@ -12,7 +12,9 @@ It accepts a +{request}+ object and responds with a +{response}+ object.
[id="{upid}-{api}-request"]
==== Stop Data Frame Request
A +{request}+ object requires a non-null `id`.
A +{request}+ object requires a non-null `id`. `id` can be a comma separated list of Ids
or a single Id. Wildcards, `*` and `_all` are also accepted.
["source","java",subs="attributes,callouts,macros"]
---------------------------------------------------

View File

@ -10,9 +10,17 @@ Stops one or more {dataframe-transforms}.
==== Request
`POST _data_frame/transforms/<data_frame_transform_id>/_stop`
`POST _data_frame/transforms/<data_frame_transform_id>/_stop` +
//==== Description
`POST _data_frame/transforms/<data_frame_transform_id1>,<data_frame_transform_id2>/_stop` +
`POST _data_frame/transforms/_all/_stop`
==== Description
You can stop multiple {dataframe-transforms} in a single API request by using a
comma-separated list of {dataframe-transforms} or a wildcard expression.
All {dataframe-transforms} can be stopped by using `_all` or `*` as the `<data_frame_transform_id>`.
==== Path Parameters

View File

@ -21,8 +21,11 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class StopDataFrameTransformAction extends Action<StopDataFrameTransformAction.Response> {
@ -45,6 +48,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
private String id;
private final boolean waitForCompletion;
private final boolean force;
private Set<String> expandedIds;
public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
@ -64,6 +68,9 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
id = in.readString();
waitForCompletion = in.readBoolean();
force = in.readBoolean();
if (in.readBoolean()) {
expandedIds = new HashSet<>(Arrays.asList(in.readStringArray()));
}
}
public String getId() {
@ -82,12 +89,25 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
return force;
}
public Set<String> getExpandedIds() {
return expandedIds;
}
public void setExpandedIds(Set<String> expandedIds ) {
this.expandedIds = expandedIds;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(waitForCompletion);
out.writeBoolean(force);
boolean hasExpandedIds = expandedIds != null;
out.writeBoolean(hasExpandedIds);
if (hasExpandedIds) {
out.writeStringArray(expandedIds.toArray(new String[0]));
}
}
@Override
@ -98,7 +118,7 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(id, waitForCompletion, force, this.getTimeout());
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout());
}
@Override
@ -119,14 +139,20 @@ public class StopDataFrameTransformAction extends Action<StopDataFrameTransformA
return Objects.equals(id, other.id) &&
Objects.equals(waitForCompletion, other.waitForCompletion) &&
Objects.equals(force, other.force);
Objects.equals(force, other.force) &&
Objects.equals(expandedIds, other.expandedIds);
}
@Override
public boolean match(Task task) {
String expectedDescription = DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id;
if (task.getDescription().startsWith(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX)) {
String id = task.getDescription().substring(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX.length());
if (expandedIds != null) {
return expandedIds.contains(id);
}
}
return task.getDescription().equals(expectedDescription);
return false;
}
}

View File

@ -8,15 +8,26 @@ package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction.Request;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
public class StopDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout);
Request request = new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout);
if (randomBoolean()) {
request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false))));
}
return request;
}
@Override
@ -35,4 +46,25 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial
assertNotEquals(r1,r2);
assertNotEquals(r1.hashCode(),r2.hashCode());
}
public void testMatch() {
String dataFrameId = "dataframe-id";
Task dataFrameTask = new Task(1L, "persistent", "action",
DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId,
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
Request request = new Request("unrelated", false, false, null);
request.setExpandedIds(new HashSet<>(Arrays.asList("foo", "bar")));
assertFalse(request.match(dataFrameTask));
Request matchingRequest = new Request(dataFrameId, false, false, null);
matchingRequest.setExpandedIds(Collections.singleton(dataFrameId));
assertTrue(matchingRequest.match(dataFrameTask));
Task notADataFrameTask = new Task(1L, "persistent", "action",
"some other task, say monitoring",
TaskId.EMPTY_TASK_ID, Collections.emptyMap());
assertFalse(matchingRequest.match(notADataFrameTask));
}
}

View File

@ -13,20 +13,27 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.ExceptionsHelper.convertToElastic;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@ -52,9 +59,13 @@ public class TransportStopDataFrameTransformAction extends
@Override
protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
ActionListener<StopDataFrameTransformAction.Response> listener) {
// Need to verify that the config actually exists
dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap(
config -> super.doExecute(task, request, listener),
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
expandedIds -> {
request.setExpandedIds(new HashSet<>(expandedIds));
request.setNodes(dataframeNodes(expandedIds, clusterService.state()));
super.doExecute(task, request, listener);
},
listener::onFailure
));
}
@ -62,7 +73,14 @@ public class TransportStopDataFrameTransformAction extends
@Override
protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StopDataFrameTransformAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
Set<String> ids = request.getExpandedIds();
if (ids == null) {
listener.onFailure(new IllegalStateException("Request does not have expandedIds set"));
return;
}
if (ids.contains(transformTask.getTransformId())) {
if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(
new ElasticsearchStatusException("Unable to stop data frame transform [" + request.getId()
@ -138,9 +156,28 @@ public class TransportStopDataFrameTransformAction extends
}
}
assert tasks.size() == 1;
boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped);
return new StopDataFrameTransformAction.Response(allStopped);
}
static String[] dataframeNodes(List<String> dataFrameIds, ClusterState clusterState) {
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasksMetaData =
PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(clusterState);
if (tasksMetaData != null) {
Set<String> dataFrameIdsSet = new HashSet<>(dataFrameIds);
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> tasks =
tasksMetaData.findTasks(DataFrameField.TASK_NAME, t -> dataFrameIdsSet.contains(t.getId()));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks) {
executorNodes.add(task.getExecutorNode());
}
}
return executorNodes.toArray(new String[0]);
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.Matchers.hasItemInArray;
public class TransportStopDataFrameTransformActionTests extends ESTestCase {
public void testDataframeNodes() {
String dataFrameIdFoo = "df-id-foo";
String dataFrameIdBar = "df-id-bar";
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(dataFrameIdFoo,
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo),
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
tasksBuilder.addTask(dataFrameIdBar,
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar),
new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
new PersistentTasksCustomMetaData.Assignment("node-3", "test assignment"));
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
String[] nodes = TransportStopDataFrameTransformAction.dataframeNodes(Arrays.asList(dataFrameIdFoo, dataFrameIdBar), cs);
assertEquals(2, nodes.length);
assertThat(nodes, hasItemInArray("node-1"));
assertThat(nodes, hasItemInArray("node-2"));
}
public void testDataframeNodes_NoTasks() {
ClusterState emptyState = ClusterState.builder(new ClusterName("_name")).build();
String[] nodes = TransportStopDataFrameTransformAction.dataframeNodes(Collections.singletonList("df-id"), emptyState);
assertEquals(0, nodes.length);
}
}

View File

@ -179,3 +179,45 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-later"
---
"Test stop all":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stop-all"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-start-later" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stop-all"
- match: { started: true }
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { started: true }
- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
- match: { stopped: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- match: { transforms.1.state.task_state: "stopped" }
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stop-all"