diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedState.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedState.java index 796f7a766df..9fda5fb215a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedState.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedState.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import java.io.IOException; import java.util.Locale; @@ -22,7 +23,7 @@ public enum DatafeedState implements Task.Status { STARTED, STOPPED; - public static final String NAME = "DatafeedState"; + public static final String NAME = StartDatafeedAction.NAME; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0])); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java index 481a909468a..59415e2ead6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatus.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; @@ -22,7 +23,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru public class JobTaskStatus implements Task.Status { - public static final String NAME = "JobState"; + public static final String NAME = OpenJobAction.NAME; private static ParseField STATE = new ParseField("state"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index 40276b5609c..c42160d24fd 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -69,16 +69,21 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable ASSIGNMENT_PARSER = new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1])); - private static final NamedObjectParser PARAMS_PARSER = - (XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskParams.class, name, null); - private static final NamedObjectParser STATUS_PARSER = - (XContentParser p, Void c, String name) -> p.namedObject(Status.class, name, null); + private static final NamedObjectParser, Void> TASK_DESCRIPTION_PARSER; static { // Tasks parser initialization PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id")); PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks")); + // Task description parser initialization + ObjectParser, String> parser = new ObjectParser<>("named"); + parser.declareObject(TaskDescriptionBuilder::setParams, + (p, c) -> p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params")); + parser.declareObject(TaskDescriptionBuilder::setStatus, + (p, c) -> p.namedObject(Status.class, c, null), new ParseField("status")); + TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); + // Assignment parser ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node")); ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation")); @@ -87,28 +92,46 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskBuilder, List objects) -> { - if (objects.size() != 1) { - throw new IllegalArgumentException("only one params per task is allowed"); - } - taskBuilder.setParams(objects.get(0)); - }, PARAMS_PARSER, new ParseField("params")); PERSISTENT_TASK_PARSER.declareNamedObjects( - (TaskBuilder taskBuilder, List objects) -> { + (TaskBuilder taskBuilder, List> objects) -> { if (objects.size() != 1) { - throw new IllegalArgumentException("only one status per task is allowed"); + throw new IllegalArgumentException("only one task description per task is allowed"); } - taskBuilder.setStatus(objects.get(0)); - }, STATUS_PARSER, new ParseField("status")); - - + TaskDescriptionBuilder builder = objects.get(0); + taskBuilder.setTaskName(builder.taskName); + taskBuilder.setParams(builder.params); + taskBuilder.setStatus(builder.status); + }, TASK_DESCRIPTION_PARSER, new ParseField("task")); PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment")); PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate, new ParseField("allocation_id_on_last_status_update")); } + /** + * Private builder used in XContent parser to build task-specific portion (params and status) + */ + private static class TaskDescriptionBuilder { + private final String taskName; + private Params params; + private Status status; + + private TaskDescriptionBuilder(String taskName) { + this.taskName = taskName; + } + + private TaskDescriptionBuilder setParams(Params params) { + this.params = params; + return this; + } + + private TaskDescriptionBuilder setStatus(Status status) { + this.status = status; + return this; + } + } + + public Collection> tasks() { return this.tasks.values(); } @@ -266,6 +289,18 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable listener) { - persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.params, + persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params, new ActionListener>() { @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java index 851b30441ac..401f158b841 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -25,13 +24,14 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; + public class UpdatePersistentTaskStatusAction extends Action { @@ -57,7 +57,7 @@ public class UpdatePersistentTaskStatusAction extends Action task : tasksInProgress.tasks()) { // explanation should correspond to the action name - switch (task.getTaskName()) { - case "should_assign": + switch (((TestParams) task.getParams()).getTestParam()) { + case "assign_me": assertThat(task.getExecutorNode(), notNullValue()); assertThat(task.isAssigned(), equalTo(true)); if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { @@ -140,7 +141,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); break; - case "should_not_assign": + case "dont_assign_me": assertThat(task.getExecutorNode(), nullValue()); assertThat(task.isAssigned(), equalTo(false)); assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment")); @@ -196,7 +197,9 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { DiscoveryNodes nodes = clusterState.nodes(); PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasksInProgress.findTasks("assign_one", task -> nodes.nodeExists(task.getExecutorNode())).isEmpty()) { + if (tasksInProgress.findTasks(TestPersistentTasksExecutor.NAME, task -> + "assign_one".equals(((TestParams) task.getParams()).getTestParam()) && + nodes.nodeExists(task.getExecutorNode())).isEmpty()) { return randomNodeAssignment(clusterState.nodes()); } else { return new Assignment(null, "only one task can be assigned at a time"); @@ -390,11 +393,12 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, Assignment assignment, String param) { return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE, - tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestParams(param), assignment).build())); + tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), assignment).build())); } - private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) { - tasks.addTask(UUIDs.base64UUID(), action, new TestParams(param), new Assignment(node, "explanation: " + action)); + private void addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) { + tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), + new Assignment(node, "explanation: " + param)); } private DiscoveryNode newNode(String nodeId) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java index 67de0f985e5..57a6a2e2812 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java @@ -68,7 +68,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new), new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom), new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), - new Entry(Task.Status.class, Status.NAME, Status::new) + new Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) )); } @@ -151,7 +151,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ return new NamedXContentRegistry(Arrays.asList( new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent) + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent) )); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java index 19fc3a80f30..146474f34dc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java @@ -189,9 +189,9 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertThrows(future1, IllegalStateException.class, "timed out after 10ms"); PlainActionFuture> failedUpdateFuture = new PlainActionFuture<>(); - persistentTasksService.updateStatus(taskId, -1, new Status("should fail"), failedUpdateFuture); + persistentTasksService.updateStatus(taskId, -2, new Status("should fail"), failedUpdateFuture); assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId + - " and allocation id -1 doesn't exist"); + " and allocation id -2 doesn't exist"); // Wait for the task to disappear WaitForPersistentTaskStatusFuture future2 = new WaitForPersistentTaskStatusFuture<>(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java index c8c286b1db8..44273098612 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import java.util.Collections; @@ -20,7 +21,7 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest protected PersistentTaskResponse createTestInstance() { if (randomBoolean()) { return new PersistentTaskResponse( - new PersistentTask(UUIDs.base64UUID(), randomAsciiOfLength(10), + new PersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestPersistentTasksPlugin.TestParams("test"), randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); } else { @@ -37,7 +38,7 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Collections.singletonList( new NamedWriteableRegistry.Entry(PersistentTaskParams.class, - TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestParams::new) + TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestParams::new) )); } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index 003b5decc3c..cd64e503032 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import java.io.IOException; import java.util.ArrayList; @@ -66,7 +67,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); - when(action.getTaskName()).thenReturn("test"); + when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME); int nonLocalNodesCount = randomInt(10); // need to account for 5 original tasks on each node and their relocations for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) { @@ -87,11 +88,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.addTask(UUIDs.base64UUID(), "test_action", new TestParams("other_" + i), + tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("other_" + i), new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node")); if (added == false && randomBoolean()) { added = true; - tasks.addTask(UUIDs.base64UUID(), "test", new TestParams("this_param"), + tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("this_param"), new Assignment("this_node", "test assignment on this node")); } } @@ -112,7 +113,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Add task on some other node state = newClusterState; - newClusterState = addTask(state, "test", null, "some_other_node"); + newClusterState = addTask(state, TestPersistentTasksExecutor.NAME, null, "some_other_node"); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); // Make sure action wasn't called again @@ -120,7 +121,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Start another task on this node state = newClusterState; - newClusterState = addTask(state, "test", new TestParams("this_param"), "this_node"); + newClusterState = addTask(state, TestPersistentTasksExecutor.NAME, new TestParams("this_param"), "this_node"); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); // Make sure action was called this time @@ -135,7 +136,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Add task on some other node state = newClusterState; - newClusterState = addTask(state, "test", null, "some_other_node"); + newClusterState = addTask(state, TestPersistentTasksExecutor.NAME, null, "some_other_node"); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); // Make sure action wasn't called again diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java index ddc37b3d5bc..5080eb5de2b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java @@ -110,7 +110,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { PersistentTasksCustomMetaData::new), new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom), - new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) + new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) ); } @@ -121,7 +121,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { PersistentTasksCustomMetaData::fromXContent), new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent) + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent) ); } @@ -211,7 +211,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } public static class Status implements Task.Status { - public static final String NAME = "test"; private final String phase; @@ -232,7 +231,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { @Override public String getWriteableName() { - return NAME; + return TestPersistentTasksExecutor.NAME; } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java index f5c3a38886f..729e0833a0e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Request; import java.util.Collections; @@ -29,7 +30,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Collections.singletonList( - new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) + new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) )); } } \ No newline at end of file diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml index a857a516fd2..045d54ceead 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yaml @@ -339,7 +339,7 @@ cluster.state: metric: [ metadata ] filter_path: metadata.persistent_tasks - - match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened} + - match: {"metadata.persistent_tasks.tasks.0.task.cluster:admin/xpack/ml/job/open.status.state": opened} - do: xpack.ml.close_job: @@ -396,7 +396,7 @@ cluster.state: metric: [ metadata ] filter_path: metadata.persistent_tasks - - match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened} + - match: {"metadata.persistent_tasks.tasks.0.task.cluster:admin/xpack/ml/job/open.status.state": opened} - do: xpack.ml.close_job: