Persistent Tasks: force writeable name of params and status to be the same as their task (elastic/x-pack-elasticsearch#1072)
Changes persistent task serialization and forces params and status to have the same writeable name as the task itself. Original commit: elastic/x-pack-elasticsearch@59cf3dca39
This commit is contained in:
parent
468507e788
commit
450d47d1f5
|
@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
@ -22,7 +23,7 @@ public enum DatafeedState implements Task.Status {
|
||||||
|
|
||||||
STARTED, STOPPED;
|
STARTED, STOPPED;
|
||||||
|
|
||||||
public static final String NAME = "DatafeedState";
|
public static final String NAME = StartDatafeedAction.NAME;
|
||||||
|
|
||||||
private static final ConstructingObjectParser<DatafeedState, Void> PARSER =
|
private static final ConstructingObjectParser<DatafeedState, Void> PARSER =
|
||||||
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));
|
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));
|
||||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
import org.elasticsearch.xpack.ml.action.OpenJobAction;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -22,7 +23,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
|
||||||
|
|
||||||
public class JobTaskStatus implements Task.Status {
|
public class JobTaskStatus implements Task.Status {
|
||||||
|
|
||||||
public static final String NAME = "JobState";
|
public static final String NAME = OpenJobAction.NAME;
|
||||||
|
|
||||||
private static ParseField STATE = new ParseField("state");
|
private static ParseField STATE = new ParseField("state");
|
||||||
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
|
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
|
||||||
|
|
|
@ -69,16 +69,21 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
||||||
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
|
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
|
||||||
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));
|
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));
|
||||||
|
|
||||||
private static final NamedObjectParser<PersistentTaskParams, Void> PARAMS_PARSER =
|
private static final NamedObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, Void> TASK_DESCRIPTION_PARSER;
|
||||||
(XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskParams.class, name, null);
|
|
||||||
private static final NamedObjectParser<Status, Void> STATUS_PARSER =
|
|
||||||
(XContentParser p, Void c, String name) -> p.namedObject(Status.class, name, null);
|
|
||||||
|
|
||||||
static {
|
static {
|
||||||
// Tasks parser initialization
|
// Tasks parser initialization
|
||||||
PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id"));
|
PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id"));
|
||||||
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
|
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
|
||||||
|
|
||||||
|
// Task description parser initialization
|
||||||
|
ObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, String> parser = new ObjectParser<>("named");
|
||||||
|
parser.declareObject(TaskDescriptionBuilder::setParams,
|
||||||
|
(p, c) -> p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params"));
|
||||||
|
parser.declareObject(TaskDescriptionBuilder::setStatus,
|
||||||
|
(p, c) -> p.namedObject(Status.class, c, null), new ParseField("status"));
|
||||||
|
TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name);
|
||||||
|
|
||||||
// Assignment parser
|
// Assignment parser
|
||||||
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
|
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
|
||||||
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
|
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
|
||||||
|
@ -87,28 +92,46 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
||||||
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id"));
|
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id"));
|
||||||
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
|
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
|
||||||
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
|
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
|
||||||
PERSISTENT_TASK_PARSER.declareNamedObjects(
|
|
||||||
(TaskBuilder<PersistentTaskParams> taskBuilder, List<PersistentTaskParams> objects) -> {
|
|
||||||
if (objects.size() != 1) {
|
|
||||||
throw new IllegalArgumentException("only one params per task is allowed");
|
|
||||||
}
|
|
||||||
taskBuilder.setParams(objects.get(0));
|
|
||||||
}, PARAMS_PARSER, new ParseField("params"));
|
|
||||||
|
|
||||||
PERSISTENT_TASK_PARSER.declareNamedObjects(
|
PERSISTENT_TASK_PARSER.declareNamedObjects(
|
||||||
(TaskBuilder<PersistentTaskParams> taskBuilder, List<Status> objects) -> {
|
(TaskBuilder<PersistentTaskParams> taskBuilder, List<TaskDescriptionBuilder<PersistentTaskParams>> objects) -> {
|
||||||
if (objects.size() != 1) {
|
if (objects.size() != 1) {
|
||||||
throw new IllegalArgumentException("only one status per task is allowed");
|
throw new IllegalArgumentException("only one task description per task is allowed");
|
||||||
}
|
}
|
||||||
taskBuilder.setStatus(objects.get(0));
|
TaskDescriptionBuilder<PersistentTaskParams> builder = objects.get(0);
|
||||||
}, STATUS_PARSER, new ParseField("status"));
|
taskBuilder.setTaskName(builder.taskName);
|
||||||
|
taskBuilder.setParams(builder.params);
|
||||||
|
taskBuilder.setStatus(builder.status);
|
||||||
|
}, TASK_DESCRIPTION_PARSER, new ParseField("task"));
|
||||||
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
|
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
|
||||||
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
|
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
|
||||||
new ParseField("allocation_id_on_last_status_update"));
|
new ParseField("allocation_id_on_last_status_update"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private builder used in XContent parser to build task-specific portion (params and status)
|
||||||
|
*/
|
||||||
|
private static class TaskDescriptionBuilder<Params extends PersistentTaskParams> {
|
||||||
|
private final String taskName;
|
||||||
|
private Params params;
|
||||||
|
private Status status;
|
||||||
|
|
||||||
|
private TaskDescriptionBuilder(String taskName) {
|
||||||
|
this.taskName = taskName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaskDescriptionBuilder setParams(Params params) {
|
||||||
|
this.params = params;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaskDescriptionBuilder setStatus(Status status) {
|
||||||
|
this.status = status;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Collection<PersistentTask<?>> tasks() {
|
public Collection<PersistentTask<?>> tasks() {
|
||||||
return this.tasks.values();
|
return this.tasks.values();
|
||||||
}
|
}
|
||||||
|
@ -266,6 +289,18 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
||||||
this.status = status;
|
this.status = status;
|
||||||
this.assignment = assignment;
|
this.assignment = assignment;
|
||||||
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
|
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
|
||||||
|
if (params != null) {
|
||||||
|
if (params.getWriteableName().equals(taskName) == false) {
|
||||||
|
throw new IllegalArgumentException("params have to have the same writeable name as task. params: " +
|
||||||
|
params.getWriteableName() + " task: " + taskName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (status != null) {
|
||||||
|
if (status.getWriteableName().equals(taskName) == false) {
|
||||||
|
throw new IllegalArgumentException("status has to have the same writeable name as task. status: " +
|
||||||
|
status.getWriteableName() + " task: " + taskName);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -371,21 +406,20 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
{
|
{
|
||||||
builder.field("id", id);
|
builder.field("id", id);
|
||||||
builder.field("name", taskName);
|
builder.startObject("task");
|
||||||
if (params != null) {
|
{
|
||||||
builder.startObject("params");
|
builder.startObject(taskName);
|
||||||
{
|
{
|
||||||
builder.field(params.getWriteableName(), params, xParams);
|
if (params != null) {
|
||||||
}
|
builder.field("params", params, xParams);
|
||||||
builder.endObject();
|
}
|
||||||
}
|
if (status != null) {
|
||||||
if (status != null) {
|
builder.field("status", status, xParams);
|
||||||
builder.startObject("status");
|
}
|
||||||
{
|
|
||||||
builder.field(status.getWriteableName(), status, xParams);
|
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
}
|
}
|
||||||
|
builder.endObject();
|
||||||
|
|
||||||
if (API_CONTEXT.equals(xParams.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
|
if (API_CONTEXT.equals(xParams.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
|
||||||
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
|
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
private String taskId;
|
private String taskId;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private String action;
|
private String taskName;
|
||||||
|
|
||||||
private PersistentTaskParams params;
|
private PersistentTaskParams params;
|
||||||
|
|
||||||
|
@ -69,9 +69,9 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Request(String taskId, String action, PersistentTaskParams params) {
|
public Request(String taskId, String taskName, PersistentTaskParams params) {
|
||||||
this.taskId = taskId;
|
this.taskId = taskId;
|
||||||
this.action = action;
|
this.taskName = taskName;
|
||||||
this.params = params;
|
this.params = params;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
taskId = in.readString();
|
taskId = in.readString();
|
||||||
action = in.readString();
|
taskName = in.readString();
|
||||||
params = in.readOptionalNamedWriteable(PersistentTaskParams.class);
|
params = in.readOptionalNamedWriteable(PersistentTaskParams.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
out.writeString(taskId);
|
out.writeString(taskId);
|
||||||
out.writeString(action);
|
out.writeString(taskName);
|
||||||
out.writeOptionalNamedWriteable(params);
|
out.writeOptionalNamedWriteable(params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,9 +97,15 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
if (this.taskId == null) {
|
if (this.taskId == null) {
|
||||||
validationException = addValidationError("task id must be specified", validationException);
|
validationException = addValidationError("task id must be specified", validationException);
|
||||||
}
|
}
|
||||||
if (this.action == null) {
|
if (this.taskName == null) {
|
||||||
validationException = addValidationError("action must be specified", validationException);
|
validationException = addValidationError("action must be specified", validationException);
|
||||||
}
|
}
|
||||||
|
if (params != null) {
|
||||||
|
if (params.getWriteableName().equals(taskName) == false) {
|
||||||
|
validationException = addValidationError("params have to have the same writeable name as task. params: " +
|
||||||
|
params.getWriteableName() + " task: " + taskName, validationException);
|
||||||
|
}
|
||||||
|
}
|
||||||
return validationException;
|
return validationException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,21 +114,21 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
if (this == o) return true;
|
if (this == o) return true;
|
||||||
if (o == null || getClass() != o.getClass()) return false;
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
Request request1 = (Request) o;
|
Request request1 = (Request) o;
|
||||||
return Objects.equals(taskId, request1.taskId) && Objects.equals(action, request1.action) &&
|
return Objects.equals(taskId, request1.taskId) && Objects.equals(taskName, request1.taskName) &&
|
||||||
Objects.equals(params, request1.params);
|
Objects.equals(params, request1.params);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(taskId, action, params);
|
return Objects.hash(taskId, taskName, params);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getAction() {
|
public String getTaskName() {
|
||||||
return action;
|
return taskName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAction(String action) {
|
public void setTaskName(String taskName) {
|
||||||
this.action = action;
|
this.taskName = taskName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getTaskId() {
|
public String getTaskId() {
|
||||||
|
@ -157,7 +163,7 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
}
|
}
|
||||||
|
|
||||||
public RequestBuilder setAction(String action) {
|
public RequestBuilder setAction(String action) {
|
||||||
request.setAction(action);
|
request.setTaskName(action);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,7 +212,7 @@ public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.
|
||||||
@Override
|
@Override
|
||||||
protected final void masterOperation(final Request request, ClusterState state,
|
protected final void masterOperation(final Request request, ClusterState state,
|
||||||
final ActionListener<PersistentTaskResponse> listener) {
|
final ActionListener<PersistentTaskResponse> listener) {
|
||||||
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.params,
|
persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params,
|
||||||
new ActionListener<PersistentTask<?>>() {
|
new ActionListener<PersistentTask<?>>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -9,7 +9,6 @@ import org.elasticsearch.action.Action;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
||||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
|
||||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||||
|
@ -25,13 +24,14 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||||
|
|
||||||
public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request,
|
public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request,
|
||||||
PersistentTaskResponse,
|
PersistentTaskResponse,
|
||||||
UpdatePersistentTaskStatusAction.RequestBuilder> {
|
UpdatePersistentTaskStatusAction.RequestBuilder> {
|
||||||
|
@ -57,7 +57,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
||||||
|
|
||||||
private String taskId;
|
private String taskId;
|
||||||
|
|
||||||
private long allocationId;
|
private long allocationId = -1L;
|
||||||
|
|
||||||
private Task.Status status;
|
private Task.Status status;
|
||||||
|
|
||||||
|
@ -101,7 +101,16 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActionRequestValidationException validate() {
|
public ActionRequestValidationException validate() {
|
||||||
return null;
|
ActionRequestValidationException validationException = null;
|
||||||
|
if (this.taskId == null) {
|
||||||
|
validationException = addValidationError("task id must be specified", validationException);
|
||||||
|
}
|
||||||
|
if (this.allocationId == -1L) {
|
||||||
|
validationException = addValidationError("allocationId must be specified", validationException);
|
||||||
|
}
|
||||||
|
// We cannot really check if status has the same type as task because we don't have access
|
||||||
|
// to the task here. We will check it when we try to update the task
|
||||||
|
return validationException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.test.VersionUtils;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
|
||||||
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -79,7 +80,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
||||||
addTestNodes(nodes, randomIntBetween(1, 10));
|
addTestNodes(nodes, randomIntBetween(1, 10));
|
||||||
int numberOfTasks = randomIntBetween(2, 40);
|
int numberOfTasks = randomIntBetween(2, 40);
|
||||||
for (int i = 0; i < numberOfTasks; i++) {
|
for (int i = 0; i < numberOfTasks; i++) {
|
||||||
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits");
|
addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits");
|
||||||
}
|
}
|
||||||
|
|
||||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
|
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
|
||||||
|
@ -103,14 +104,14 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
||||||
switch (randomInt(2)) {
|
switch (randomInt(2)) {
|
||||||
case 0:
|
case 0:
|
||||||
// add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned
|
// add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned
|
||||||
addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits");
|
addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exits");
|
||||||
break;
|
break;
|
||||||
case 1:
|
case 1:
|
||||||
// add a task assigned to non-existing node that should not get assigned
|
// add a task assigned to non-existing node that should not get assigned
|
||||||
addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits");
|
addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exits");
|
||||||
break;
|
break;
|
||||||
case 2:
|
case 2:
|
||||||
addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits");
|
addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits");
|
||||||
break;
|
break;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -129,8 +130,8 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
||||||
|
|
||||||
for (PersistentTask<?> task : tasksInProgress.tasks()) {
|
for (PersistentTask<?> task : tasksInProgress.tasks()) {
|
||||||
// explanation should correspond to the action name
|
// explanation should correspond to the action name
|
||||||
switch (task.getTaskName()) {
|
switch (((TestParams) task.getParams()).getTestParam()) {
|
||||||
case "should_assign":
|
case "assign_me":
|
||||||
assertThat(task.getExecutorNode(), notNullValue());
|
assertThat(task.getExecutorNode(), notNullValue());
|
||||||
assertThat(task.isAssigned(), equalTo(true));
|
assertThat(task.isAssigned(), equalTo(true));
|
||||||
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
|
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
|
||||||
|
@ -140,7 +141,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
||||||
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
|
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
|
||||||
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
|
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
|
||||||
break;
|
break;
|
||||||
case "should_not_assign":
|
case "dont_assign_me":
|
||||||
assertThat(task.getExecutorNode(), nullValue());
|
assertThat(task.getExecutorNode(), nullValue());
|
||||||
assertThat(task.isAssigned(), equalTo(false));
|
assertThat(task.isAssigned(), equalTo(false));
|
||||||
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
|
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
|
||||||
|
@ -196,7 +197,9 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
||||||
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
|
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
|
||||||
DiscoveryNodes nodes = clusterState.nodes();
|
DiscoveryNodes nodes = clusterState.nodes();
|
||||||
PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||||
if (tasksInProgress.findTasks("assign_one", task -> nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
|
if (tasksInProgress.findTasks(TestPersistentTasksExecutor.NAME, task ->
|
||||||
|
"assign_one".equals(((TestParams) task.getParams()).getTestParam()) &&
|
||||||
|
nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
|
||||||
return randomNodeAssignment(clusterState.nodes());
|
return randomNodeAssignment(clusterState.nodes());
|
||||||
} else {
|
} else {
|
||||||
return new Assignment(null, "only one task can be assigned at a time");
|
return new Assignment(null, "only one task can be assigned at a time");
|
||||||
|
@ -390,11 +393,12 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
||||||
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
|
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
|
||||||
Assignment assignment, String param) {
|
Assignment assignment, String param) {
|
||||||
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
|
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
|
||||||
tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestParams(param), assignment).build()));
|
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), assignment).build()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
|
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
|
||||||
tasks.addTask(UUIDs.base64UUID(), action, new TestParams(param), new Assignment(node, "explanation: " + action));
|
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param),
|
||||||
|
new Assignment(node, "explanation: " + param));
|
||||||
}
|
}
|
||||||
|
|
||||||
private DiscoveryNode newNode(String nodeId) {
|
private DiscoveryNode newNode(String nodeId) {
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
||||||
new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new),
|
new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new),
|
||||||
new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom),
|
new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom),
|
||||||
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
|
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
|
||||||
new Entry(Task.Status.class, Status.NAME, Status::new)
|
new Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,7 +151,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
||||||
return new NamedXContentRegistry(Arrays.asList(
|
return new NamedXContentRegistry(Arrays.asList(
|
||||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
|
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
|
||||||
TestParams::fromXContent),
|
TestParams::fromXContent),
|
||||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
|
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -189,9 +189,9 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
||||||
assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
|
assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
|
||||||
|
|
||||||
PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
|
PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
|
||||||
persistentTasksService.updateStatus(taskId, -1, new Status("should fail"), failedUpdateFuture);
|
persistentTasksService.updateStatus(taskId, -2, new Status("should fail"), failedUpdateFuture);
|
||||||
assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId +
|
assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId +
|
||||||
" and allocation id -1 doesn't exist");
|
" and allocation id -2 doesn't exist");
|
||||||
|
|
||||||
// Wait for the task to disappear
|
// Wait for the task to disappear
|
||||||
WaitForPersistentTaskStatusFuture<?> future2 = new WaitForPersistentTaskStatusFuture<>();
|
WaitForPersistentTaskStatusFuture<?> future2 = new WaitForPersistentTaskStatusFuture<>();
|
||||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
|
@ -20,7 +21,7 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
|
||||||
protected PersistentTaskResponse createTestInstance() {
|
protected PersistentTaskResponse createTestInstance() {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
return new PersistentTaskResponse(
|
return new PersistentTaskResponse(
|
||||||
new PersistentTask<PersistentTaskParams>(UUIDs.base64UUID(), randomAsciiOfLength(10),
|
new PersistentTask<PersistentTaskParams>(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME,
|
||||||
new TestPersistentTasksPlugin.TestParams("test"),
|
new TestPersistentTasksPlugin.TestParams("test"),
|
||||||
randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
|
randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
|
||||||
} else {
|
} else {
|
||||||
|
@ -37,7 +38,7 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
|
||||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||||
return new NamedWriteableRegistry(Collections.singletonList(
|
return new NamedWriteableRegistry(Collections.singletonList(
|
||||||
new NamedWriteableRegistry.Entry(PersistentTaskParams.class,
|
new NamedWriteableRegistry.Entry(PersistentTaskParams.class,
|
||||||
TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestParams::new)
|
TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestParams::new)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
|
||||||
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -66,7 +67,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
|
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
|
||||||
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
|
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
|
||||||
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
|
||||||
when(action.getTaskName()).thenReturn("test");
|
when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME);
|
||||||
int nonLocalNodesCount = randomInt(10);
|
int nonLocalNodesCount = randomInt(10);
|
||||||
// need to account for 5 original tasks on each node and their relocations
|
// need to account for 5 original tasks on each node and their relocations
|
||||||
for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) {
|
for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) {
|
||||||
|
@ -87,11 +88,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
boolean added = false;
|
boolean added = false;
|
||||||
if (nonLocalNodesCount > 0) {
|
if (nonLocalNodesCount > 0) {
|
||||||
for (int i = 0; i < randomInt(5); i++) {
|
for (int i = 0; i < randomInt(5); i++) {
|
||||||
tasks.addTask(UUIDs.base64UUID(), "test_action", new TestParams("other_" + i),
|
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("other_" + i),
|
||||||
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
|
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
|
||||||
if (added == false && randomBoolean()) {
|
if (added == false && randomBoolean()) {
|
||||||
added = true;
|
added = true;
|
||||||
tasks.addTask(UUIDs.base64UUID(), "test", new TestParams("this_param"),
|
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("this_param"),
|
||||||
new Assignment("this_node", "test assignment on this node"));
|
new Assignment("this_node", "test assignment on this node"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -112,7 +113,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
// Add task on some other node
|
// Add task on some other node
|
||||||
state = newClusterState;
|
state = newClusterState;
|
||||||
newClusterState = addTask(state, "test", null, "some_other_node");
|
newClusterState = addTask(state, TestPersistentTasksExecutor.NAME, null, "some_other_node");
|
||||||
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
||||||
|
|
||||||
// Make sure action wasn't called again
|
// Make sure action wasn't called again
|
||||||
|
@ -120,7 +121,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
// Start another task on this node
|
// Start another task on this node
|
||||||
state = newClusterState;
|
state = newClusterState;
|
||||||
newClusterState = addTask(state, "test", new TestParams("this_param"), "this_node");
|
newClusterState = addTask(state, TestPersistentTasksExecutor.NAME, new TestParams("this_param"), "this_node");
|
||||||
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
||||||
|
|
||||||
// Make sure action was called this time
|
// Make sure action was called this time
|
||||||
|
@ -135,7 +136,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
// Add task on some other node
|
// Add task on some other node
|
||||||
state = newClusterState;
|
state = newClusterState;
|
||||||
newClusterState = addTask(state, "test", null, "some_other_node");
|
newClusterState = addTask(state, TestPersistentTasksExecutor.NAME, null, "some_other_node");
|
||||||
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
||||||
|
|
||||||
// Make sure action wasn't called again
|
// Make sure action wasn't called again
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
PersistentTasksCustomMetaData::new),
|
PersistentTasksCustomMetaData::new),
|
||||||
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE,
|
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE,
|
||||||
PersistentTasksCustomMetaData::readDiffFrom),
|
PersistentTasksCustomMetaData::readDiffFrom),
|
||||||
new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new)
|
new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
PersistentTasksCustomMetaData::fromXContent),
|
PersistentTasksCustomMetaData::fromXContent),
|
||||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
|
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
|
||||||
TestParams::fromXContent),
|
TestParams::fromXContent),
|
||||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
|
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,7 +211,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Status implements Task.Status {
|
public static class Status implements Task.Status {
|
||||||
public static final String NAME = "test";
|
|
||||||
|
|
||||||
private final String phase;
|
private final String phase;
|
||||||
|
|
||||||
|
@ -232,7 +231,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getWriteableName() {
|
public String getWriteableName() {
|
||||||
return NAME;
|
return TestPersistentTasksExecutor.NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status;
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status;
|
||||||
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||||
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Request;
|
import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Request;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -29,7 +30,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
|
||||||
@Override
|
@Override
|
||||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||||
return new NamedWriteableRegistry(Collections.singletonList(
|
return new NamedWriteableRegistry(Collections.singletonList(
|
||||||
new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new)
|
new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -339,7 +339,7 @@
|
||||||
cluster.state:
|
cluster.state:
|
||||||
metric: [ metadata ]
|
metric: [ metadata ]
|
||||||
filter_path: metadata.persistent_tasks
|
filter_path: metadata.persistent_tasks
|
||||||
- match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened}
|
- match: {"metadata.persistent_tasks.tasks.0.task.cluster:admin/xpack/ml/job/open.status.state": opened}
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
xpack.ml.close_job:
|
xpack.ml.close_job:
|
||||||
|
@ -396,7 +396,7 @@
|
||||||
cluster.state:
|
cluster.state:
|
||||||
metric: [ metadata ]
|
metric: [ metadata ]
|
||||||
filter_path: metadata.persistent_tasks
|
filter_path: metadata.persistent_tasks
|
||||||
- match: {metadata.persistent_tasks.tasks.0.status.JobState.state: opened}
|
- match: {"metadata.persistent_tasks.tasks.0.task.cluster:admin/xpack/ml/job/open.status.state": opened}
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
xpack.ml.close_job:
|
xpack.ml.close_job:
|
||||||
|
|
Loading…
Reference in New Issue