diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 49b9c137291..1c73460e59d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -123,8 +123,8 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction; -import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction; -import org.elasticsearch.xpack.persistent.PersistentTaskRequest; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; +import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.PersistentTasksClusterService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; @@ -216,8 +216,8 @@ public class MachineLearning implements ActionPlugin { PersistentTasksCustomMetaData::readDiffFrom), // Persistent action requests - new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new), - new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new), + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new), + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.NAME, OpenJobAction.Request::new), // Task statuses new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, @@ -236,9 +236,9 @@ public class MachineLearning implements ActionPlugin { PersistentTasksCustomMetaData::fromXContent), // Persistent action requests - new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(StartDatafeedAction.NAME), + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.NAME), StartDatafeedAction.Request::fromXContent), - new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(OpenJobAction.NAME), + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.NAME), OpenJobAction.Request::fromXContent), // Task statuses @@ -422,7 +422,7 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), - new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), + new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index beb987c2397..ebe630112ad 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -79,7 +79,7 @@ public class MlAssignmentNotifier extends AbstractComponent implements ClusterSt continue; } if (OpenJobAction.NAME.equals(currentTask.getTaskName())) { - String jobId = ((OpenJobAction.Request) currentTask.getRequest()).getJobId(); + String jobId = ((OpenJobAction.Request) currentTask.getParams()).getJobId(); if (currentAssignment.getExecutorNode() == null) { auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); } else { @@ -87,7 +87,7 @@ public class MlAssignmentNotifier extends AbstractComponent implements ClusterSt auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } } else if (StartDatafeedAction.NAME.equals(currentTask.getTaskName())) { - String datafeedId = ((StartDatafeedAction.Request) currentTask.getRequest()).getDatafeedId(); + String datafeedId = ((StartDatafeedAction.Request) currentTask.getParams()).getDatafeedId(); MlMetadata mlMetadata = event.state().getMetaData().custom(MlMetadata.TYPE); DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId); if (currentAssignment.getExecutorNode() == null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 5cde7d3de3f..ca94ce6bc1e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilters; @@ -49,7 +50,7 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.AllocatedPersistentTask; -import org.elasticsearch.xpack.persistent.PersistentTaskRequest; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -86,7 +87,7 @@ public class OpenJobAction extends Action persistentTask) { + return new JobTask(persistentTask.getParams().getJobId(), id, type, action, parentTaskId); + } + void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) { logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), this.maxConcurrentJobAllocations, maxConcurrentJobAllocations); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 4067c32c6d4..4c08189f185 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; @@ -56,7 +57,7 @@ import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.AllocatedPersistentTask; -import org.elasticsearch.xpack.persistent.PersistentTaskRequest; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -94,7 +95,7 @@ public class StartDatafeedAction return new Response(); } - public static class Request extends PersistentTaskRequest implements ToXContent { + public static class Request extends ActionRequest implements PersistentTaskParams, ToXContent { public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); @@ -190,11 +191,6 @@ public class StartDatafeedAction return e; } - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new DatafeedTask(id, type, action, parentTaskId, this); - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -454,6 +450,11 @@ public class StartDatafeedAction }); } + @Override + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTask persistentTask) { + return new DatafeedTask(id, type, action, parentTaskId, persistentTask.getParams()); + } } static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java index 76c5d240209..8133fb4724d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; @@ -20,10 +21,10 @@ public class NodePersistentTasksExecutor { this.threadPool = threadPool; } - public void executeTask(Request request, - AllocatedPersistentTask task, - PersistentTasksExecutor action) { - threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() { + public void executeTask(@Nullable Params params, + AllocatedPersistentTask task, + PersistentTasksExecutor executor) { + threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { task.markAsFailed(e); @@ -33,7 +34,7 @@ public class NodePersistentTasksExecutor { @Override protected void doRun() throws Exception { try { - action.nodeOperation(task, request); + executor.nodeOperation(task, params); } catch (Exception ex) { task.markAsFailed(ex); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskParams.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskParams.java new file mode 100644 index 00000000000..478bbadfc26 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskParams.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.persistent; + +import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.xcontent.ToXContent; + +/** + * Parameters used to start persistent task + */ +public interface PersistentTaskParams extends NamedWriteable, ToXContent { + +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskRequest.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskRequest.java deleted file mode 100644 index 38205801ca9..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskRequest.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.persistent; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.common.io.stream.NamedWriteable; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; - -/** - * Base class for a request for a persistent task - */ -public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent { - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId); - } -} \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java index 1f1e2825cd3..37e1f9025fe 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; @@ -44,11 +45,11 @@ public class PersistentTasksClusterService extends AbstractComponent implements * Creates a new persistent task on master node * * @param action the action name - * @param request request + * @param params params * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String taskId, String action, Request request, - ActionListener> listener) { + public void createPersistentTask(String taskId, String action, @Nullable Params params, + ActionListener> listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -56,10 +57,10 @@ public class PersistentTasksClusterService extends AbstractComponent implements if (builder.hasTask(taskId)) { throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist"); } - validate(action, clusterService.state(), request); + validate(action, clusterService.state(), params); final Assignment assignment; - assignment = getAssignement(action, currentState, request); - return update(currentState, builder.addTask(taskId, action, request, assignment)); + assignment = getAssignement(action, currentState, params); + return update(currentState, builder.addTask(taskId, action, params, assignment)); } @Override @@ -191,14 +192,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements }); } - private Assignment getAssignement(String taskName, ClusterState currentState, Request request) { - PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); - return persistentTasksExecutor.getAssignment(request, currentState); + private Assignment getAssignement(String taskName, ClusterState currentState, + @Nullable Params params) { + PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); + return persistentTasksExecutor.getAssignment(params, currentState); } - private void validate(String taskName, ClusterState currentState, Request request) { - PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); - persistentTasksExecutor.validate(request, currentState); + private void validate(String taskName, ClusterState currentState, @Nullable Params params) { + PersistentTasksExecutor persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName); + persistentTasksExecutor.validate(params, currentState); } @Override @@ -215,7 +217,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements } interface ExecutorNodeDecider { - Assignment getAssignment(String action, ClusterState currentState, Request request); + Assignment getAssignment(String action, ClusterState currentState, Params params); } static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { @@ -231,7 +233,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements if (taskInProgress.needsReassignment(event.state().nodes())) { // there is an unassigned task or task with a disappeared node - we need to try assigning it if (Objects.equals(taskInProgress.getAssignment(), - decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getRequest())) == false) { + decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) { // it looks like a assignment for at least one task is possible - let's trigger reassignment reassignmentRequired = true; break; @@ -276,7 +278,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements for (PersistentTask task : tasks.tasks()) { if (task.needsReassignment(nodes)) { // there is an unassigned task - we need to try assigning it - Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getRequest()); + Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams()); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace("reassigning task {} from node {} to node {}", task.getId(), task.getAssignment().getExecutorNode(), assignment.getExecutorNode()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index 2248db7ac3a..bd14eadaf9e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -63,14 +63,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new); - private static final ObjectParser, Void> PERSISTENT_TASK_PARSER = + private static final ObjectParser, Void> PERSISTENT_TASK_PARSER = new ObjectParser<>("tasks", TaskBuilder::new); public static final ConstructingObjectParser ASSIGNMENT_PARSER = new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1])); - private static final NamedObjectParser REQUEST_PARSER = - (XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskRequest.class, name, null); + private static final NamedObjectParser PARAMS_PARSER = + (XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskParams.class, name, null); private static final NamedObjectParser STATUS_PARSER = (XContentParser p, Void c, String name) -> p.namedObject(Status.class, name, null); @@ -88,15 +88,15 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskBuilder, List objects) -> { + (TaskBuilder taskBuilder, List objects) -> { if (objects.size() != 1) { - throw new IllegalArgumentException("only one request per task is allowed"); + throw new IllegalArgumentException("only one params per task is allowed"); } - taskBuilder.setRequest(objects.get(0)); - }, REQUEST_PARSER, new ParseField("request")); + taskBuilder.setParams(objects.get(0)); + }, PARAMS_PARSER, new ParseField("params")); PERSISTENT_TASK_PARSER.declareNamedObjects( - (TaskBuilder taskBuilder, List objects) -> { + (TaskBuilder taskBuilder, List objects) -> { if (objects.size() != 1) { throw new IllegalArgumentException("only one status per task is allowed"); } @@ -173,10 +173,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable PersistentTask getTaskWithId(ClusterState clusterState, String taskId) { + public static PersistentTask getTaskWithId(ClusterState clusterState, String taskId) { PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); if (tasks != null) { - return (PersistentTask) tasks.getTask(taskId); + return (PersistentTask) tasks.getTask(taskId); } return null; } @@ -230,11 +230,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable implements Writeable, ToXContent { + public static class PersistentTask implements Writeable, ToXContent { private final String id; private final long allocationId; private final String taskName; - private final Request request; + @Nullable + private final Params params; @Nullable private final Status status; private final Assignment assignment; @@ -242,31 +243,29 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable task, long allocationId, Assignment assignment) { - this(task.id, allocationId, task.taskName, task.request, task.status, + public PersistentTask(PersistentTask task, long allocationId, Assignment assignment) { + this(task.id, allocationId, task.taskName, task.params, task.status, assignment, task.allocationId); } - public PersistentTask(PersistentTask task, Status status) { - this(task.id, task.allocationId, task.taskName, task.request, status, + public PersistentTask(PersistentTask task, Status status) { + this(task.id, task.allocationId, task.taskName, task.params, status, task.assignment, task.allocationId); } - private PersistentTask(String id, long allocationId, String taskName, Request request, + private PersistentTask(String id, long allocationId, String taskName, Params params, Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; this.taskName = taskName; - this.request = request; + this.params = params; this.status = status; this.assignment = assignment; this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; - // Update parent request for starting tasks with correct parent task ID - request.setParentTask("cluster", allocationId); } @SuppressWarnings("unchecked") @@ -274,7 +273,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable { + private static class TaskBuilder { private String id; private long allocationId; private String taskName; - private Request request; + private Params params; private Status status; private Assignment assignment = INITIAL_ASSIGNMENT; private Long allocationIdOnLastStatusUpdate; - public TaskBuilder setId(String id) { + public TaskBuilder setId(String id) { this.id = id; return this; } - public TaskBuilder setAllocationId(long allocationId) { + public TaskBuilder setAllocationId(long allocationId) { this.allocationId = allocationId; return this; } - public TaskBuilder setTaskName(String taskName) { + public TaskBuilder setTaskName(String taskName) { this.taskName = taskName; return this; } - public TaskBuilder setRequest(Request request) { - this.request = request; + public TaskBuilder setParams(Params params) { + this.params = params; return this; } - public TaskBuilder setStatus(Status status) { + public TaskBuilder setStatus(Status status) { this.status = status; return this; } - public TaskBuilder setAssignment(Assignment assignment) { + public TaskBuilder setAssignment(Assignment assignment) { this.assignment = assignment; return this; } - public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { + public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) { this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; return this; } - public PersistentTask build() { - return new PersistentTask<>(id, allocationId, taskName, request, status, + public PersistentTask build() { + return new PersistentTask<>(id, allocationId, taskName, params, status, assignment, allocationIdOnLastStatusUpdate); } } @@ -486,7 +488,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable entry : tasks.values()) { @@ -526,7 +528,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable Builder setTasks(List> tasks) { + private Builder setTasks(List> tasks) { for (TaskBuilder builder : tasks) { PersistentTask task = builder.build(); this.tasks.put(task.getId(), task); @@ -544,10 +546,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable * After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method. */ - public Builder addTask(String taskId, String taskName, Request request, - Assignment assignment) { + public Builder addTask(String taskId, String taskName, Params params, + Assignment assignment) { changed = true; - PersistentTask previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, request, + PersistentTask previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, params, getNextAllocationId(), assignment)); if (previousTask != null) { throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}"); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java index 2fc8ecfe9f8..c39cde85425 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java @@ -5,13 +5,14 @@ */ package org.elasticsearch.xpack.persistent; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.util.function.Predicate; @@ -19,7 +20,7 @@ import java.util.function.Predicate; * An executor of tasks that can survive restart of requesting or executing node. * These tasks are using cluster state rather than only transport service to send requests and responses. */ -public abstract class PersistentTasksExecutor extends AbstractComponent { +public abstract class PersistentTasksExecutor extends AbstractComponent { private final String executor; private final String taskName; @@ -37,11 +38,11 @@ public abstract class PersistentTasksExecutor * The default implementation returns the least loaded data node */ - public Assignment getAssignment(Request request, ClusterState clusterState) { + public Assignment getAssignment(Params params, ClusterState clusterState) { DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode); if (discoveryNode == null) { return NO_NODE_FOUND; @@ -74,21 +75,36 @@ public abstract class PersistentTasksExecutor - * Throws an exception if the supplied request cannot be executed on the cluster in the current state. + * Throws an exception if the supplied params cannot be executed on the cluster in the current state. */ - public void validate(Request request, ClusterState clusterState) { + public void validate(Params params, ClusterState clusterState) { } + /** + * Creates a AllocatedPersistentTask for communicating with task manager + */ + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTask taskInProgress) { + return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId); + } + + /** + * Returns task description that will be available via task manager + */ + protected String getDescription(PersistentTask taskInProgress) { + return "id=" + taskInProgress.getId(); + } + /** * This operation will be executed on the executor node. *

* NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, Request request); + protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params); public String getExecutor() { return executor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorRegistry.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorRegistry.java index 25781c66a7d..a4bb86c5dc7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorRegistry.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorRegistry.java @@ -31,8 +31,8 @@ public class PersistentTasksExecutorRegistry extends AbstractComponent { } @SuppressWarnings("unchecked") - public PersistentTasksExecutor getPersistentTaskExecutorSafe(String taskName) { - PersistentTasksExecutor executor = (PersistentTasksExecutor) taskExecutors.get(taskName); + public PersistentTasksExecutor getPersistentTaskExecutorSafe(String taskName) { + PersistentTasksExecutor executor = (PersistentTasksExecutor) taskExecutors.get(taskName); if (executor == null) { throw new IllegalStateException("Unknown persistent executor [" + taskName + "]"); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java index e8d4338356f..92d1e58d73f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java @@ -18,6 +18,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskAwareRequest; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -118,16 +120,36 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } - private void startTask(PersistentTask taskInProgress) { - PersistentTasksExecutor action = persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName()); + private void startTask(PersistentTask taskInProgress) { + PersistentTasksExecutor executor = + persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName()); + + TaskAwareRequest request = new TaskAwareRequest() { + TaskId parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId()); + + @Override + public void setParentTask(TaskId taskId) { + throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change"); + } + + @Override + public TaskId getParentTask() { + return parentTaskId; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return executor.createTask(id, type, action, parentTaskId, taskInProgress); + } + }; AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", - taskInProgress.getRequest()); + request); boolean processed = false; try { task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); try { runningTasks.put(taskInProgress.getAllocationId(), task); - nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action); + nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor); } catch (Exception e) { // Submit task failure task.markAsFailed(e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index b2c9196b100..c2fb39322f2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -45,13 +45,13 @@ public class PersistentTasksService extends AbstractComponent { * Creates the specified persistent task and attempts to assign it to a node. */ @SuppressWarnings("unchecked") - public void startPersistentTask(String taskId, String taskName, Request request, - ActionListener> listener) { - CreatePersistentTaskAction.Request createPersistentActionRequest = - new CreatePersistentTaskAction.Request(taskId, taskName, request); + public void startPersistentTask(String taskId, String taskName, @Nullable Params params, + ActionListener> listener) { + StartPersistentTaskAction.Request createPersistentActionRequest = + new StartPersistentTaskAction.Request(taskId, taskName, params); try { - client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( - o -> listener.onResponse((PersistentTask) o.getTask()), listener::onFailure)); + client.execute(StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( + o -> listener.onResponse((PersistentTask) o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } @@ -170,8 +170,8 @@ public class PersistentTasksService extends AbstractComponent { } } - public interface WaitForPersistentTaskStatusListener - extends ActionListener> { + public interface WaitForPersistentTaskStatusListener + extends ActionListener> { default void onTimeout(TimeValue timeout) { onFailure(new IllegalStateException("timed out after " + timeout)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java similarity index 82% rename from plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java rename to plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java index b545fd6d3c7..22b11a389a5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -34,14 +35,14 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; /** * This action can be used to add the record for the persistent action to the cluster state. */ -public class CreatePersistentTaskAction extends Action { + StartPersistentTaskAction.RequestBuilder> { - public static final CreatePersistentTaskAction INSTANCE = new CreatePersistentTaskAction(); - public static final String NAME = "cluster:admin/persistent/create"; + public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction(); + public static final String NAME = "cluster:admin/persistent/start"; - private CreatePersistentTaskAction() { + private StartPersistentTaskAction() { super(NAME); } @@ -59,18 +60,19 @@ public class CreatePersistentTaskAction extends Action { + public static class RequestBuilder extends MasterNodeOperationRequestBuilder { - protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction action) { + protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) { super(client, action, new Request()); } @@ -161,8 +161,8 @@ public class CreatePersistentTaskAction extends Action listener) { - persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.request, + persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.params, new ActionListener>() { @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index e8ee3731b81..0a1ee400283 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -18,7 +18,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.persistent.PersistentTaskRequest; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -51,7 +51,7 @@ public class MlAssignmentNotifierTests extends ESTestCase { .build(); Map> tasks = new HashMap<>(); - tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, + tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id"), 0L, new Assignment("node_id", ""))); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, @@ -80,7 +80,7 @@ public class MlAssignmentNotifierTests extends ESTestCase { .build(); Map> tasks = new HashMap<>(); - tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, + tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id"), 0L, new Assignment(null, "no nodes"))); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java index 18bd4d5ab0d..14682b34e3e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.persistent.PersistentTaskRequest; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; import org.elasticsearch.xpack.security.Security; @@ -74,9 +74,9 @@ public class DatafeedJobsIT extends SecurityIntegTestCase { entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, StartDatafeedAction.NAME, + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.NAME, OpenJobAction.Request::new)); entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index ee505195619..9375327d823 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -17,7 +17,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.elasticsearch.xpack.persistent.PersistentTaskRequest; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -54,7 +54,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe } public void testValidate() { - PersistentTask task = new PersistentTask("datafeed-foo", StartDatafeedAction.NAME, + PersistentTask task = new PersistentTask("datafeed-foo", StartDatafeedAction.NAME, new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STARTED); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("datafeed-foo", task)); @@ -75,7 +75,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe public void testValidate_alreadyStopped() { PersistentTasksCustomMetaData tasks; if (randomBoolean()) { - PersistentTask task = new PersistentTask("1L", StartDatafeedAction.NAME, + PersistentTask task = new PersistentTask("1L", StartDatafeedAction.NAME, new StartDatafeedAction.Request("foo2", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); } else { @@ -97,7 +97,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe Map> taskMap = new HashMap<>(); Builder mlMetadataBuilder = new MlMetadata.Builder(); - PersistentTask task = new PersistentTask("datafeed-datafeed_1", StartDatafeedAction.NAME, + PersistentTask task = new PersistentTask("datafeed-datafeed_1", StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STARTED); taskMap.put("datafeed-datafeed_1", task); @@ -105,7 +105,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - task = new PersistentTask("datafeed-datafeed_2", StartDatafeedAction.NAME, + task = new PersistentTask("datafeed-datafeed_2", StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STOPPED); taskMap.put("datafeed-datafeed_2", task); @@ -113,7 +113,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - task = new PersistentTask("datafeed-datafeed_3", StartDatafeedAction.NAME, + task = new PersistentTask("3L", StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STARTED); taskMap.put("datafeed-datafeed_3", task); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java index fe634319eec..857186549fc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams; import java.util.ArrayList; import java.util.Arrays; @@ -54,9 +54,9 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { assertThat(dumpEvent(event), PersistentTasksClusterService.reassignmentRequired(event, new PersistentTasksClusterService.ExecutorNodeDecider() { @Override - public Assignment getAssignment( - String action, ClusterState currentState, Request request) { - if ("never_assign".equals(((TestRequest) request).getTestParam())) { + public Assignment getAssignment( + String action, ClusterState currentState, Params params) { + if ("never_assign".equals(((TestParams) params).getTestParam())) { return NO_NODE_FOUND; } return randomNodeAssignment(currentState.nodes()); @@ -171,10 +171,10 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { return PersistentTasksClusterService.reassignTasks(clusterState, logger, new PersistentTasksClusterService.ExecutorNodeDecider() { @Override - public Assignment getAssignment( - String action, ClusterState currentState, Request request) { - TestRequest testRequest = (TestRequest) request; - switch (testRequest.getTestParam()) { + public Assignment getAssignment( + String action, ClusterState currentState, Params params) { + TestParams testParams = (TestParams) params; + switch (testParams.getTestParam()) { case "assign_me": return randomNodeAssignment(currentState.nodes()); case "dont_assign_me": @@ -185,7 +185,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { case "assign_one": return assignOnlyOneTaskAtATime(currentState); default: - fail("unknown param " + testRequest.getTestParam()); + fail("unknown param " + testParams.getTestParam()); } return NO_NODE_FOUND; } @@ -279,7 +279,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); for (PersistentTask task : tasks.tasks()) { // Remove all unassigned tasks that cause changing assignments they might trigger a significant change - if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) && + if ("never_assign".equals(((TestParams) task.getParams()).getTestParam()) && "change me".equals(task.getAssignment().getExplanation())) { logger.info("removed task with changing assignment {}", task.getId()); tasksBuilder.removeTask(task.getId()); @@ -346,7 +346,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { logger.info("removed all unassigned tasks and changed routing table"); if (tasks != null) { for (PersistentTask task : tasks.tasks()) { - if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) { + if (task.getExecutorNode() == null || "never_assign".equals(((TestParams) task.getParams()).getTestParam())) { tasksBuilder.removeTask(task.getId()); } } @@ -368,7 +368,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { } return tasks.tasks().stream().anyMatch(task -> { if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) { - return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false; + return "never_assign".equals(((TestParams) task.getParams()).getTestParam()) == false; } return false; }); @@ -390,11 +390,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, Assignment assignment, String param) { return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE, - tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestRequest(param), assignment).build())); + tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestParams(param), assignment).build())); } private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) { - tasks.addTask(UUIDs.base64UUID(), action, new TestRequest(param), new Assignment(node, "explanation: " + action)); + tasks.addTask(UUIDs.base64UUID(), action, new TestParams(param), new Assignment(node, "explanation: " + action)); } private DiscoveryNode newNode(String nodeId) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java index f6c0e594e56..8ef4e23c93c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java @@ -28,7 +28,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Builder; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams; import java.io.IOException; import java.util.ArrayList; @@ -47,7 +47,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); for (int i = 0; i < numberOfTasks; i++) { String taskId = UUIDs.base64UUID(); - tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), + tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment()); if (randomBoolean()) { // From time to time update status @@ -67,7 +67,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ return new NamedWriteableRegistry(Arrays.asList( new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new), new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom), - new Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new), + new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), new Entry(Task.Status.class, Status.NAME, Status::new) )); } @@ -138,7 +138,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ private String addRandomTask(Builder builder) { String taskId = UUIDs.base64UUID(); - builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment()); + builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment()); return taskId; } @@ -149,8 +149,8 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ @Override protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry(Arrays.asList( - new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(TestPersistentTasksExecutor.NAME), - TestRequest::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), + TestParams::fromXContent), new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent) )); } @@ -175,14 +175,14 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ assertEquals(testInstance.tasks().size(), newInstance.tasks().size()); for (PersistentTask testTask : testInstance.tasks()) { - PersistentTask newTask = (PersistentTask) newInstance.getTask(testTask.getId()); + PersistentTask newTask = (PersistentTask) newInstance.getTask(testTask.getId()); assertNotNull(newTask); // Things that should be serialized assertEquals(testTask.getTaskName(), newTask.getTaskName()); assertEquals(testTask.getId(), newTask.getId()); assertEquals(testTask.getStatus(), newTask.getStatus()); - assertEquals(testTask.getRequest(), newTask.getRequest()); + assertEquals(testTask.getParams(), newTask.getParams()); // Things that shouldn't be serialized assertEquals(0, newTask.getAllocationId()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java index 6b0d574d387..aca9f20c112 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java @@ -12,7 +12,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams; import java.util.ArrayList; import java.util.Collection; @@ -46,13 +46,14 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); int numberOfTasks = randomIntBetween(1, 10); String[] taskIds = new String[numberOfTasks]; - List>> futures = new ArrayList<>(numberOfTasks); + List>> futures = new ArrayList<>(numberOfTasks); for (int i = 0; i < numberOfTasks; i++) { - PlainActionFuture> future = new PlainActionFuture<>(); + PlainActionFuture> future = new PlainActionFuture<>(); futures.add(future); taskIds[i] = UUIDs.base64UUID(); - service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), + future); } for (int i = 0; i < numberOfTasks; i++) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java index 91355980309..19fc3a80f30 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java @@ -19,7 +19,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; import org.junit.After; @@ -55,15 +55,15 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertNoRunningTasks(); } - public static class WaitForPersistentTaskStatusFuture - extends PlainActionFuture> - implements WaitForPersistentTaskStatusListener { + public static class WaitForPersistentTaskStatusFuture + extends PlainActionFuture> + implements WaitForPersistentTaskStatusListener { } public void testPersistentActionFailure() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + PlainActionFuture> future = new PlainActionFuture<>(); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); long allocationId = future.get().getAllocationId(); assertBusy(() -> { // Wait for the task to start @@ -92,29 +92,31 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testPersistentActionCompletion() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - long taskId = future.get().getAllocationId(); + PlainActionFuture> future = new PlainActionFuture<>(); + String taskId = UUIDs.base64UUID(); + persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); + long allocationId = future.get().getAllocationId(); assertBusy(() -> { // Wait for the task to start assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() .getTasks().size(), equalTo(1)); }); TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") - .get().getTasks().get(0); + .setDetailed(true).get().getTasks().get(0); logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId()); - // Verifying parent - assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId)); + // Verifying parent and description + assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId)); assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster")); + assertThat(firstRunningTask.getDescription(), equalTo("id=" + taskId)); stopOrCancelTask(firstRunningTask.getTaskId()); } public void testPersistentActionWithNoAvailableNode() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PlainActionFuture> future = new PlainActionFuture<>(); - TestRequest testRequest = new TestRequest("Blah"); - testRequest.setExecutorNodeAttr("test"); - persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testRequest, future); + PlainActionFuture> future = new PlainActionFuture<>(); + TestParams testParams = new TestParams("Blah"); + testParams.setExecutorNodeAttr("test"); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); String taskId = future.get().getId(); Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); @@ -147,8 +149,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + PlainActionFuture> future = new PlainActionFuture<>(); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); String taskId = future.get().getId(); assertBusy(() -> { @@ -205,13 +207,13 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testCreatePersistentTaskWithDuplicateId() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PlainActionFuture> future = new PlainActionFuture<>(); + PlainActionFuture> future = new PlainActionFuture<>(); String taskId = UUIDs.base64UUID(); - persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); future.get(); - PlainActionFuture> future2 = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future2); + PlainActionFuture> future2 = new PlainActionFuture<>(); + persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2); assertThrows(future2, ResourceAlreadyExistsException.class); assertBusy(() -> { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java index 90e5eaaa784..c8c286b1db8 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java @@ -20,8 +20,8 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest protected PersistentTaskResponse createTestInstance() { if (randomBoolean()) { return new PersistentTaskResponse( - new PersistentTask(UUIDs.base64UUID(), randomAsciiOfLength(10), - new TestPersistentTasksPlugin.TestRequest("test"), + new PersistentTask(UUIDs.base64UUID(), randomAsciiOfLength(10), + new TestPersistentTasksPlugin.TestParams("test"), randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); } else { return new PersistentTaskResponse(null); @@ -36,7 +36,8 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Collections.singletonList( - new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestRequest::new) + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, + TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestParams::new) )); } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index 41b490e2b19..003b5decc3c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -18,12 +18,13 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams; import java.io.IOException; import java.util.ArrayList; @@ -34,6 +35,10 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.empty; import static org.hamcrest.core.IsEqual.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -59,12 +64,18 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { public void testStartTask() throws Exception { ClusterService clusterService = createClusterService(); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); - @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); + @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); when(action.getTaskName()).thenReturn("test"); + int nonLocalNodesCount = randomInt(10); + // need to account for 5 original tasks on each node and their relocations + for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) { + TaskId parentId = new TaskId("cluster", i); + when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn( + new TestPersistentTasksPlugin.TestTask(i, "persistent", "test", "", parentId)); + } PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action)); - int nonLocalNodesCount = randomInt(10); MockExecutor executor = new MockExecutor(); PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, registry, new TaskManager(Settings.EMPTY), executor); @@ -76,11 +87,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.addTask(UUIDs.base64UUID(), "test_action", new TestRequest("other_" + i), + tasks.addTask(UUIDs.base64UUID(), "test_action", new TestParams("other_" + i), new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node")); if (added == false && randomBoolean()) { added = true; - tasks.addTask(UUIDs.base64UUID(), "test", new TestRequest("this_param"), + tasks.addTask(UUIDs.base64UUID(), "test", new TestParams("this_param"), new Assignment("this_node", "test assignment on this node")); } } @@ -101,7 +112,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Add task on some other node state = newClusterState; - newClusterState = addTask(state, "test", new TestRequest(), "some_other_node"); + newClusterState = addTask(state, "test", null, "some_other_node"); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); // Make sure action wasn't called again @@ -109,7 +120,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Start another task on this node state = newClusterState; - newClusterState = addTask(state, "test", new TestRequest("this_param"), "this_node"); + newClusterState = addTask(state, "test", new TestParams("this_param"), "this_node"); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); // Make sure action was called this time @@ -124,7 +135,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Add task on some other node state = newClusterState; - newClusterState = addTask(state, "test", new TestRequest(), "some_other_node"); + newClusterState = addTask(state, "test", null, "some_other_node"); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); // Make sure action wasn't called again @@ -162,9 +173,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { fail("Shouldn't be called during Cluster State cancellation"); } }; - @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); + @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); when(action.getTaskName()).thenReturn("test"); + when(action.createTask(anyLong(), anyString(), anyString(), any(), any())) + .thenReturn(new TestPersistentTasksPlugin.TestTask(1, "persistent", "test", "", new TaskId("cluster", 1))); PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action)); int nonLocalNodesCount = randomInt(10); @@ -179,7 +192,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { ClusterState newClusterState = state; // Allocate first task state = newClusterState; - newClusterState = addTask(state, "test", new TestRequest(), "this_node"); + newClusterState = addTask(state, "test", null, "this_node"); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); // Check the the task is know to the task manager @@ -222,12 +235,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } - private ClusterState addTask(ClusterState state, String action, Request request, - String node) { + private ClusterState addTask(ClusterState state, String action, Params params, + String node) { PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, - builder.addTask(UUIDs.base64UUID(), action, request, new Assignment(node, "test assignment")).build())).build(); + builder.addTask(UUIDs.base64UUID(), action, params, new Assignment(node, "test assignment")).build())).build(); } private ClusterState reallocateTask(ClusterState state, String taskId, String node) { @@ -247,12 +260,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } private class Execution { - private final PersistentTaskRequest request; + private final PersistentTaskParams params; private final AllocatedPersistentTask task; private final PersistentTasksExecutor holder; - Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor holder) { - this.request = request; + Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor holder) { + this.params = params; this.task = task; this.holder = holder; } @@ -266,9 +279,9 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } @Override - public void executeTask(Request request, AllocatedPersistentTask task, - PersistentTasksExecutor action) { - executions.add(new Execution(request, task, action)); + public void executeTask(Params params, AllocatedPersistentTask task, + PersistentTasksExecutor executor) { + executions.add(new Execution(params, task, executor)); } public Execution get(int i) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java index 436ef2160b1..b75882d3ddf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java @@ -8,9 +8,9 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; -import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction.Request; +import org.elasticsearch.xpack.persistent.StartPersistentTaskAction.Request; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; +import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.test.AbstractStreamableTestCase; import java.util.Collections; @@ -19,17 +19,19 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas @Override protected Request createTestInstance() { - TestRequest testRequest = new TestRequest(); + TestParams testParams; if (randomBoolean()) { - testRequest.setTestParam(randomAlphaOfLengthBetween(1, 20)); + testParams = new TestParams(); + if (randomBoolean()) { + testParams.setTestParam(randomAlphaOfLengthBetween(1, 20)); + } + if (randomBoolean()) { + testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); + } + } else { + testParams = null; } - if (randomBoolean()) { - testRequest.setParentTask(randomAlphaOfLengthBetween(1, 20), randomLong()); - } - if (randomBoolean()) { - testRequest.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); - } - return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), new TestRequest()); + return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams); } @Override @@ -40,7 +42,7 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Collections.singletonList( - new Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new) + new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new) )); } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java index 10783fe973e..ddc37b3d5bc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; @@ -78,7 +77,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { public List> getActions() { return Arrays.asList( new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class), - new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), + new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class) @@ -104,7 +103,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { @Override public List getNamedWriteables() { return Arrays.asList( - new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new), + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new), new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, @@ -120,16 +119,16 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { return Arrays.asList( new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksCustomMetaData.TYPE), PersistentTasksCustomMetaData::fromXContent), - new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(TestPersistentTasksExecutor.NAME), - TestRequest::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), + TestParams::fromXContent), new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent) ); } - public static class TestRequest extends PersistentTaskRequest { + public static class TestParams implements PersistentTaskParams { - public static final ConstructingObjectParser REQUEST_PARSER = - new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new TestRequest((String) args[0])); + public static final ConstructingObjectParser REQUEST_PARSER = + new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new TestParams((String) args[0])); static { REQUEST_PARSER.declareString(constructorArg(), new ParseField("param")); @@ -141,21 +140,18 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { private String testParam = null; - public TestRequest() { + public TestParams() { } - public TestRequest(String testParam) { + public TestParams(String testParam) { this.testParam = testParam; } - public TestRequest(StreamInput in) throws IOException { - readFrom(in); - } - - @Override - public ActionRequestValidationException validate() { - return null; + public TestParams(StreamInput in) throws IOException { + executorNodeAttr = in.readOptionalString(); + responseNode = in.readOptionalString(); + testParam = in.readOptionalString(); } @Override @@ -181,20 +177,11 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); out.writeOptionalString(executorNodeAttr); out.writeOptionalString(responseNode); out.writeOptionalString(testParam); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - executorNodeAttr = in.readOptionalString(); - responseNode = in.readOptionalString(); - testParam = in.readOptionalString(); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -203,7 +190,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { return builder; } - public static TestRequest fromXContent(XContentParser parser) throws IOException { + public static TestParams fromXContent(XContentParser parser) throws IOException { return REQUEST_PARSER.parse(parser, null); } @@ -211,7 +198,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - TestRequest that = (TestRequest) o; + TestParams that = (TestParams) o; return Objects.equals(executorNodeAttr, that.executorNodeAttr) && Objects.equals(responseNode, that.responseNode) && Objects.equals(testParam, that.testParam); @@ -221,11 +208,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { public int hashCode() { return Objects.hash(executorNodeAttr, responseNode, testParam); } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new TestTask(id, type, action, getDescription(), parentTaskId); - } } public static class Status implements Task.Status { @@ -298,7 +280,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } - public static class TestPersistentTasksExecutor extends PersistentTasksExecutor { + public static class TestPersistentTasksExecutor extends PersistentTasksExecutor { public static final String NAME = "cluster:admin/persistent/test"; private final ClusterService clusterService; @@ -309,12 +291,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } @Override - public Assignment getAssignment(TestRequest request, ClusterState clusterState) { - if (request.getExecutorNodeAttr() == null) { - return super.getAssignment(request, clusterState); + public Assignment getAssignment(TestParams params, ClusterState clusterState) { + if (params == null || params.getExecutorNodeAttr() == null) { + return super.getAssignment(params, clusterState); } else { DiscoveryNode executorNode = selectLeastLoadedNode(clusterState, - discoveryNode -> request.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr"))); + discoveryNode -> params.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr"))); if (executorNode != null) { return new Assignment(executorNode.getId(), "test assignment"); } else { @@ -325,7 +307,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } @Override - protected void nodeOperation(AllocatedPersistentTask task, TestRequest request) { + protected void nodeOperation(AllocatedPersistentTask task, TestParams params) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -385,6 +367,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { task.markAsFailed(e); } } + + @Override + protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, + PersistentTask task) { + return new TestTask(id, type, action, getDescription(task), parentTaskId); + } } public static class TestTaskAction extends Action { diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index e2ad253d31e..66c00666f16 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -139,7 +139,7 @@ cluster:admin/xpack/ml/job/update indices:internal/data/write/xpackdeletebyquery cluster:internal/xpack/ml/job/update/process cluster:admin/xpack/ml/delete_expired_data -cluster:admin/persistent/create +cluster:admin/persistent/start cluster:admin/persistent/completion cluster:admin/persistent/update_status cluster:admin/persistent/remove