Persistent Tasks: PersistentTaskRequest -> PersistTaskParams ()

Removes the last pieces of ActionRequest from PersistentTaskRequest and renames it into PersistTaskParams, which is now just an interface that extends NamedWriteable and ToXContent.
This commit is contained in:
Igor Motov 2017-04-12 09:58:15 -04:00 committed by Martijn van Groningen
parent 6bfea09dd6
commit abd9ae399c
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
17 changed files with 300 additions and 251 deletions

@ -18,6 +18,7 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
@ -33,10 +34,10 @@ public class NodePersistentTasksExecutor {
this.threadPool = threadPool;
}
public <Request extends PersistentTaskRequest> void executeTask(Request request,
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
AllocatedPersistentTask task,
PersistentTasksExecutor<Request> action) {
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
PersistentTasksExecutor<Params> executor) {
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
@ -46,7 +47,7 @@ public class NodePersistentTasksExecutor {
@Override
protected void doRun() throws Exception {
try {
action.nodeOperation(task, request);
executor.nodeOperation(task, params);
} catch (Exception ex) {
task.markAsFailed(ex);
}

@ -16,20 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
/**
* Base class for a request for a persistent task
* Parameters used to start persistent task
*/
public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId);
}
public interface PersistentTaskParams extends NamedWriteable, ToXContent {
}

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
@ -58,10 +59,10 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* Creates a new persistent task on master node
*
* @param action the action name
* @param request request
* @param params params
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String taskId, String action, Request request,
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
@ -70,10 +71,10 @@ public class PersistentTasksClusterService extends AbstractComponent implements
if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
}
validate(action, clusterService.state(), request);
validate(action, clusterService.state(), params);
final Assignment assignment;
assignment = getAssignement(action, currentState, request);
return update(currentState, builder.addTask(taskId, action, request, assignment));
assignment = getAssignement(action, currentState, params);
return update(currentState, builder.addTask(taskId, action, params, assignment));
}
@Override
@ -205,14 +206,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements
});
}
private <Request extends PersistentTaskRequest> Assignment getAssignement(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(request, currentState);
private <Params extends PersistentTaskParams> Assignment getAssignement(String taskName, ClusterState currentState,
@Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(params, currentState);
}
private <Request extends PersistentTaskRequest> void validate(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(request, currentState);
private <Params extends PersistentTaskParams> void validate(String taskName, ClusterState currentState, @Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(params, currentState);
}
@Override
@ -229,7 +231,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
}
interface ExecutorNodeDecider {
<Request extends PersistentTaskRequest> Assignment getAssignment(String action, ClusterState currentState, Request request);
<Params extends PersistentTaskParams> Assignment getAssignment(String action, ClusterState currentState, Params params);
}
static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
@ -245,7 +247,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getAssignment(),
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getRequest())) == false) {
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
@ -290,7 +292,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
for (PersistentTask<?> task : tasks.tasks()) {
if (task.needsReassignment(nodes)) {
// there is an unassigned task - we need to try assigning it
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getRequest());
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams());
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getAssignment().getExecutorNode(), assignment.getExecutorNode());

@ -76,14 +76,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private static final ObjectParser<Builder, Void> PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new);
private static final ObjectParser<TaskBuilder<PersistentTaskRequest>, Void> PERSISTENT_TASK_PARSER =
private static final ObjectParser<TaskBuilder<PersistentTaskParams>, Void> PERSISTENT_TASK_PARSER =
new ObjectParser<>("tasks", TaskBuilder::new);
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));
private static final NamedObjectParser<PersistentTaskRequest, Void> REQUEST_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskRequest.class, name, null);
private static final NamedObjectParser<PersistentTaskParams, Void> PARAMS_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskParams.class, name, null);
private static final NamedObjectParser<Status, Void> STATUS_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(Status.class, name, null);
@ -101,15 +101,15 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskRequest> taskBuilder, List<PersistentTaskRequest> objects) -> {
(TaskBuilder<PersistentTaskParams> taskBuilder, List<PersistentTaskParams> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one request per task is allowed");
throw new IllegalArgumentException("only one params per task is allowed");
}
taskBuilder.setRequest(objects.get(0));
}, REQUEST_PARSER, new ParseField("request"));
taskBuilder.setParams(objects.get(0));
}, PARAMS_PARSER, new ParseField("params"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskRequest> taskBuilder, List<Status> objects) -> {
(TaskBuilder<PersistentTaskParams> taskBuilder, List<Status> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one status per task is allowed");
}
@ -186,10 +186,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
@SuppressWarnings("unchecked")
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, String taskId) {
public static <Params extends PersistentTaskParams> PersistentTask<Params> getTaskWithId(ClusterState clusterState, String taskId) {
PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) {
return (PersistentTask<Request>) tasks.getTask(taskId);
return (PersistentTask<Params>) tasks.getTask(taskId);
}
return null;
}
@ -243,11 +243,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
/**
* A record that represents a single running persistent task
*/
public static class PersistentTask<Request extends PersistentTaskRequest> implements Writeable, ToXContent {
public static class PersistentTask<Params extends PersistentTaskParams> implements Writeable, ToXContent {
private final String id;
private final long allocationId;
private final String taskName;
private final Request request;
@Nullable
private final Params params;
@Nullable
private final Status status;
private final Assignment assignment;
@ -255,31 +256,29 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final Long allocationIdOnLastStatusUpdate;
public PersistentTask(String id, String taskName, Request request, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, request, null, assignment, null);
public PersistentTask(String id, String taskName, Params params, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, params, null, assignment, null);
}
public PersistentTask(PersistentTask<Request> task, long allocationId, Assignment assignment) {
this(task.id, allocationId, task.taskName, task.request, task.status,
public PersistentTask(PersistentTask<Params> task, long allocationId, Assignment assignment) {
this(task.id, allocationId, task.taskName, task.params, task.status,
assignment, task.allocationId);
}
public PersistentTask(PersistentTask<Request> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.request, status,
public PersistentTask(PersistentTask<Params> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.params, status,
task.assignment, task.allocationId);
}
private PersistentTask(String id, long allocationId, String taskName, Request request,
private PersistentTask(String id, long allocationId, String taskName, Params params,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.taskName = taskName;
this.request = request;
this.params = params;
this.status = status;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", allocationId);
}
@SuppressWarnings("unchecked")
@ -287,7 +286,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
id = in.readString();
allocationId = in.readLong();
taskName = in.readString();
request = (Request) in.readNamedWriteable(PersistentTaskRequest.class);
params = (Params) in.readOptionalNamedWriteable(PersistentTaskParams.class);
status = in.readOptionalNamedWriteable(Task.Status.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
@ -298,7 +297,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
out.writeString(id);
out.writeLong(allocationId);
out.writeString(taskName);
out.writeNamedWriteable(request);
out.writeOptionalNamedWriteable(params);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
@ -313,7 +312,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return Objects.equals(id, that.id) &&
allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) &&
Objects.equals(request, that.request) &&
Objects.equals(params, that.params) &&
Objects.equals(status, that.status) &&
Objects.equals(assignment, that.assignment) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
@ -321,7 +320,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public int hashCode() {
return Objects.hash(id, allocationId, taskName, request, status, assignment,
return Objects.hash(id, allocationId, taskName, params, status, assignment,
allocationIdOnLastStatusUpdate);
}
@ -342,8 +341,9 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return taskName;
}
public Request getRequest() {
return request;
@Nullable
public Params getParams() {
return params;
}
@Nullable
@ -380,25 +380,27 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params xParams) throws IOException {
builder.startObject();
{
builder.field("id", id);
builder.field("name", taskName);
builder.startObject("request");
if (params != null) {
builder.startObject("params");
{
builder.field(request.getWriteableName(), request, params);
builder.field(params.getWriteableName(), params, xParams);
}
builder.endObject();
}
if (status != null) {
builder.startObject("status");
{
builder.field(status.getWriteableName(), status, params);
builder.field(status.getWriteableName(), status, xParams);
}
builder.endObject();
}
if (API_CONTEXT.equals(params.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
if (API_CONTEXT.equals(xParams.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
builder.field("allocation_id", allocationId);
builder.startObject("assignment");
@ -422,53 +424,53 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
}
private static class TaskBuilder<Request extends PersistentTaskRequest> {
private static class TaskBuilder<Params extends PersistentTaskParams> {
private String id;
private long allocationId;
private String taskName;
private Request request;
private Params params;
private Status status;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(String id) {
public TaskBuilder<Params> setId(String id) {
this.id = id;
return this;
}
public TaskBuilder<Request> setAllocationId(long allocationId) {
public TaskBuilder<Params> setAllocationId(long allocationId) {
this.allocationId = allocationId;
return this;
}
public TaskBuilder<Request> setTaskName(String taskName) {
public TaskBuilder<Params> setTaskName(String taskName) {
this.taskName = taskName;
return this;
}
public TaskBuilder<Request> setRequest(Request request) {
this.request = request;
public TaskBuilder<Params> setParams(Params params) {
this.params = params;
return this;
}
public TaskBuilder<Request> setStatus(Status status) {
public TaskBuilder<Params> setStatus(Status status) {
this.status = status;
return this;
}
public TaskBuilder<Request> setAssignment(Assignment assignment) {
public TaskBuilder<Params> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
}
public TaskBuilder<Request> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
public TaskBuilder<Params> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
return this;
}
public PersistentTask<Request> build() {
return new PersistentTask<>(id, allocationId, taskName, request, status,
public PersistentTask<Params> build() {
return new PersistentTask<>(id, allocationId, taskName, params, status,
assignment, allocationIdOnLastStatusUpdate);
}
}
@ -499,7 +501,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("last_allocation_id", lastAllocationId);
builder.startArray("tasks");
for (PersistentTask<?> entry : tasks.values()) {
@ -539,7 +541,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this;
}
private <Request extends PersistentTaskRequest> Builder setTasks(List<TaskBuilder<Request>> tasks) {
private <Params extends PersistentTaskParams> Builder setTasks(List<TaskBuilder<Params>> tasks) {
for (TaskBuilder builder : tasks) {
PersistentTask<?> task = builder.build();
this.tasks.put(task.getId(), task);
@ -557,10 +559,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* <p>
* After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method.
*/
public <Request extends PersistentTaskRequest> Builder addTask(String taskId, String taskName, Request request,
public <Params extends PersistentTaskParams> Builder addTask(String taskId, String taskName, Params params,
Assignment assignment) {
changed = true;
PersistentTask<?> previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, request,
PersistentTask<?> previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, params,
getNextAllocationId(), assignment));
if (previousTask != null) {
throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}");

@ -19,13 +19,14 @@
package org.elasticsearch.persistent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.function.Predicate;
@ -33,7 +34,7 @@ import java.util.function.Predicate;
* An executor of tasks that can survive restart of requesting or executing node.
* These tasks are using cluster state rather than only transport service to send requests and responses.
*/
public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequest> extends AbstractComponent {
public abstract class PersistentTasksExecutor<Params extends PersistentTaskParams> extends AbstractComponent {
private final String executor;
private final String taskName;
@ -51,11 +52,11 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
public static final Assignment NO_NODE_FOUND = new Assignment(null, "no appropriate nodes found for the assignment");
/**
* Returns the node id where the request has to be executed,
* Returns the node id where the params has to be executed,
* <p>
* The default implementation returns the least loaded data node
*/
public Assignment getAssignment(Request request, ClusterState clusterState) {
public Assignment getAssignment(Params params, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode);
if (discoveryNode == null) {
return NO_NODE_FOUND;
@ -88,21 +89,36 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
}
/**
* Checks the current cluster state for compatibility with the request
* Checks the current cluster state for compatibility with the params
* <p>
* Throws an exception if the supplied request cannot be executed on the cluster in the current state.
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
*/
public void validate(Request request, ClusterState clusterState) {
public void validate(Params params, ClusterState clusterState) {
}
/**
* Creates a AllocatedPersistentTask for communicating with task manager
*/
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<Params> taskInProgress) {
return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId);
}
/**
* Returns task description that will be available via task manager
*/
protected String getDescription(PersistentTask<Params> taskInProgress) {
return "id=" + taskInProgress.getId();
}
/**
* This operation will be executed on the executor node.
* <p>
* NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
* indicate that the persistent task has finished.
*/
protected abstract void nodeOperation(AllocatedPersistentTask task, Request request);
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params);
public String getExecutor() {
return executor;

@ -44,8 +44,8 @@ public class PersistentTasksExecutorRegistry extends AbstractComponent {
}
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> PersistentTasksExecutor<Request> getPersistentTaskExecutorSafe(String taskName) {
PersistentTasksExecutor<Request> executor = (PersistentTasksExecutor<Request>) taskExecutors.get(taskName);
public <Params extends PersistentTaskParams> PersistentTasksExecutor<Params> getPersistentTaskExecutorSafe(String taskName) {
PersistentTasksExecutor<Params> executor = (PersistentTasksExecutor<Params>) taskExecutors.get(taskName);
if (executor == null) {
throw new IllegalStateException("Unknown persistent executor [" + taskName + "]");
}

@ -31,6 +31,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -131,16 +133,36 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
}
private <Request extends PersistentTaskRequest> void startTask(PersistentTask<Request> taskInProgress) {
PersistentTasksExecutor<Request> action = persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName());
private <Params extends PersistentTaskParams> void startTask(PersistentTask<Params> taskInProgress) {
PersistentTasksExecutor<Params> executor =
persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName());
TaskAwareRequest request = new TaskAwareRequest() {
TaskId parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
@Override
public void setParentTask(TaskId taskId) {
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
}
@Override
public TaskId getParentTask() {
return parentTaskId;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return executor.createTask(id, type, action, parentTaskId, taskInProgress);
}
};
AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
taskInProgress.getRequest());
request);
boolean processed = false;
try {
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
try {
runningTasks.put(taskInProgress.getAllocationId(), task);
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action);
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor);
} catch (Exception e) {
// Submit task failure
task.markAsFailed(e);

@ -58,13 +58,13 @@ public class PersistentTasksService extends AbstractComponent {
* Creates the specified persistent task and attempts to assign it to a node.
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> void startPersistentTask(String taskId, String taskName, Request request,
ActionListener<PersistentTask<Request>> listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest =
new CreatePersistentTaskAction.Request(taskId, taskName, request);
public <Params extends PersistentTaskParams> void startPersistentTask(String taskId, String taskName, @Nullable Params params,
ActionListener<PersistentTask<Params>> listener) {
StartPersistentTaskAction.Request createPersistentActionRequest =
new StartPersistentTaskAction.Request(taskId, taskName, params);
try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse((PersistentTask<Request>) o.getTask()), listener::onFailure));
client.execute(StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
@ -183,8 +183,8 @@ public class PersistentTasksService extends AbstractComponent {
}
}
public interface WaitForPersistentTaskStatusListener<Request extends PersistentTaskRequest>
extends ActionListener<PersistentTask<Request>> {
public interface WaitForPersistentTaskStatusListener<Params extends PersistentTaskParams>
extends ActionListener<PersistentTask<Params>> {
default void onTimeout(TimeValue timeout) {
onFailure(new IllegalStateException("timed out after " + timeout));
}

@ -11,6 +11,10 @@
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;
@ -27,6 +31,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -43,14 +48,14 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* This action can be used to add the record for the persistent action to the cluster state.
*/
public class CreatePersistentTaskAction extends Action<CreatePersistentTaskAction.Request,
public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.Request,
PersistentTaskResponse,
CreatePersistentTaskAction.RequestBuilder> {
StartPersistentTaskAction.RequestBuilder> {
public static final CreatePersistentTaskAction INSTANCE = new CreatePersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/create";
public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/start";
private CreatePersistentTaskAction() {
private StartPersistentTaskAction() {
super(NAME);
}
@ -68,18 +73,19 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
private String taskId;
@Nullable
private String action;
private PersistentTaskRequest request;
private PersistentTaskParams params;
public Request() {
}
public Request(String taskId, String action, PersistentTaskRequest request) {
public Request(String taskId, String action, PersistentTaskParams params) {
this.taskId = taskId;
this.action = action;
this.request = request;
this.params = params;
}
@Override
@ -87,7 +93,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.readFrom(in);
taskId = in.readString();
action = in.readString();
request = in.readNamedWriteable(PersistentTaskRequest.class);
params = in.readOptionalNamedWriteable(PersistentTaskParams.class);
}
@Override
@ -95,7 +101,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.writeTo(out);
out.writeString(taskId);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeOptionalNamedWriteable(params);
}
@Override
@ -107,9 +113,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (this.action == null) {
validationException = addValidationError("action must be specified", validationException);
}
if (this.request == null) {
validationException = addValidationError("request must be specified", validationException);
}
return validationException;
}
@ -119,12 +122,12 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(taskId, request1.taskId) && Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request);
Objects.equals(params, request1.params);
}
@Override
public int hashCode() {
return Objects.hash(taskId, action, request);
return Objects.hash(taskId, action, params);
}
public String getAction() {
@ -143,20 +146,21 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
this.taskId = taskId;
}
public PersistentTaskRequest getRequest() {
return request;
public PersistentTaskParams getParams() {
return params;
}
public void setRequest(PersistentTaskRequest request) {
this.request = request;
@Nullable
public void setParams(PersistentTaskParams params) {
this.params = params;
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CreatePersistentTaskAction.Request,
PersistentTaskResponse, CreatePersistentTaskAction.RequestBuilder> {
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<StartPersistentTaskAction.Request,
PersistentTaskResponse, StartPersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction action) {
protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) {
super(client, action, new Request());
}
@ -170,8 +174,8 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
return this;
}
public RequestBuilder setRequest(PersistentTaskRequest persistentTaskRequest) {
request.setRequest(persistentTaskRequest);
public RequestBuilder setRequest(PersistentTaskParams params) {
request.setParams(params);
return this;
}
@ -188,7 +192,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry,
PersistentTasksService persistentTasksService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CreatePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTasksClusterService = persistentTasksClusterService;
NodePersistentTasksExecutor executor = new NodePersistentTasksExecutor(threadPool);
@ -215,7 +219,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.request,
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.params,
new ActionListener<PersistentTask<?>>() {
@Override

@ -35,7 +35,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import java.util.ArrayList;
import java.util.Arrays;
@ -68,9 +68,9 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
assertThat(dumpEvent(event), PersistentTasksClusterService.reassignmentRequired(event,
new PersistentTasksClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentTaskRequest> Assignment getAssignment(
String action, ClusterState currentState, Request request) {
if ("never_assign".equals(((TestRequest) request).getTestParam())) {
public <Params extends PersistentTaskParams> Assignment getAssignment(
String action, ClusterState currentState, Params params) {
if ("never_assign".equals(((TestParams) params).getTestParam())) {
return NO_NODE_FOUND;
}
return randomNodeAssignment(currentState.nodes());
@ -185,10 +185,10 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
return PersistentTasksClusterService.reassignTasks(clusterState, logger,
new PersistentTasksClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentTaskRequest> Assignment getAssignment(
String action, ClusterState currentState, Request request) {
TestRequest testRequest = (TestRequest) request;
switch (testRequest.getTestParam()) {
public <Params extends PersistentTaskParams> Assignment getAssignment(
String action, ClusterState currentState, Params params) {
TestParams testParams = (TestParams) params;
switch (testParams.getTestParam()) {
case "assign_me":
return randomNodeAssignment(currentState.nodes());
case "dont_assign_me":
@ -199,7 +199,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
case "assign_one":
return assignOnlyOneTaskAtATime(currentState);
default:
fail("unknown param " + testRequest.getTestParam());
fail("unknown param " + testParams.getTestParam());
}
return NO_NODE_FOUND;
}
@ -293,7 +293,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
for (PersistentTask<?> task : tasks.tasks()) {
// Remove all unassigned tasks that cause changing assignments they might trigger a significant change
if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) &&
if ("never_assign".equals(((TestParams) task.getParams()).getTestParam()) &&
"change me".equals(task.getAssignment().getExplanation())) {
logger.info("removed task with changing assignment {}", task.getId());
tasksBuilder.removeTask(task.getId());
@ -360,7 +360,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
logger.info("removed all unassigned tasks and changed routing table");
if (tasks != null) {
for (PersistentTask<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) {
if (task.getExecutorNode() == null || "never_assign".equals(((TestParams) task.getParams()).getTestParam())) {
tasksBuilder.removeTask(task.getId());
}
}
@ -382,7 +382,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
}
return tasks.tasks().stream().anyMatch(task -> {
if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) {
return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false;
return "never_assign".equals(((TestParams) task.getParams()).getTestParam()) == false;
}
return false;
});
@ -404,11 +404,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
Assignment assignment, String param) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestRequest(param), assignment).build()));
tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestParams(param), assignment).build()));
}
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
tasks.addTask(UUIDs.base64UUID(), action, new TestRequest(param), new Assignment(node, "explanation: " + action));
tasks.addTask(UUIDs.base64UUID(), action, new TestParams(param), new Assignment(node, "explanation: " + action));
}
private DiscoveryNode newNode(String nodeId) {

@ -41,7 +41,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Builder;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import java.io.IOException;
import java.util.ArrayList;
@ -60,7 +60,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
for (int i = 0; i < numberOfTasks; i++) {
String taskId = UUIDs.base64UUID();
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)),
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)),
randomAssignment());
if (randomBoolean()) {
// From time to time update status
@ -80,7 +80,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
return new NamedWriteableRegistry(Arrays.asList(
new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new),
new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom),
new Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new),
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
new Entry(Task.Status.class, Status.NAME, Status::new)
));
}
@ -151,7 +151,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
private String addRandomTask(Builder builder) {
String taskId = UUIDs.base64UUID();
builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment());
builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment());
return taskId;
}
@ -162,8 +162,8 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
@Override
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(Arrays.asList(
new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestRequest::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestParams::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
));
}
@ -188,14 +188,14 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
assertEquals(testInstance.tasks().size(), newInstance.tasks().size());
for (PersistentTask<?> testTask : testInstance.tasks()) {
PersistentTask<TestRequest> newTask = (PersistentTask<TestRequest>) newInstance.getTask(testTask.getId());
PersistentTask<TestParams> newTask = (PersistentTask<TestParams>) newInstance.getTask(testTask.getId());
assertNotNull(newTask);
// Things that should be serialized
assertEquals(testTask.getTaskName(), newTask.getTaskName());
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getStatus(), newTask.getStatus());
assertEquals(testTask.getRequest(), newTask.getRequest());
assertEquals(testTask.getParams(), newTask.getParams());
// Things that shouldn't be serialized
assertEquals(0, newTask.getAllocationId());

@ -25,7 +25,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import java.util.ArrayList;
import java.util.Collection;
@ -59,13 +59,14 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
int numberOfTasks = randomIntBetween(1, 10);
String[] taskIds = new String[numberOfTasks];
List<PlainActionFuture<PersistentTask<TestRequest>>> futures = new ArrayList<>(numberOfTasks);
List<PlainActionFuture<PersistentTask<TestParams>>> futures = new ArrayList<>(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) {
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
futures.add(future);
taskIds[i] = UUIDs.base64UUID();
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"),
future);
}
for (int i = 0; i < numberOfTasks; i++) {

@ -33,7 +33,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
import org.junit.After;
@ -69,15 +69,15 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertNoRunningTasks();
}
public static class WaitForPersistentTaskStatusFuture<Request extends PersistentTaskRequest>
extends PlainActionFuture<PersistentTask<Request>>
implements WaitForPersistentTaskStatusListener<Request> {
public static class WaitForPersistentTaskStatusFuture<Params extends PersistentTaskParams>
extends PlainActionFuture<PersistentTask<Params>>
implements WaitForPersistentTaskStatusListener<Params> {
}
public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
long allocationId = future.get().getAllocationId();
assertBusy(() -> {
// Wait for the task to start
@ -106,29 +106,31 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionCompletion() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getAllocationId();
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
long allocationId = future.get().getAllocationId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);
.setDetailed(true).get().getTasks().get(0);
logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId());
// Verifying parent
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId));
// Verifying parent and description
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
assertThat(firstRunningTask.getDescription(), equalTo("id=" + taskId));
stopOrCancelTask(firstRunningTask.getTaskId());
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
TestRequest testRequest = new TestRequest("Blah");
testRequest.setExecutorNodeAttr("test");
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testRequest, future);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
TestParams testParams = new TestParams("Blah");
testParams.setExecutorNodeAttr("test");
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
String taskId = future.get().getId();
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
@ -161,8 +163,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionStatusUpdate() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
String taskId = future.get().getId();
assertBusy(() -> {
@ -219,13 +221,13 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testCreatePersistentTaskWithDuplicateId() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
future.get();
PlainActionFuture<PersistentTask<TestRequest>> future2 = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future2);
PlainActionFuture<PersistentTask<TestParams>> future2 = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2);
assertThrows(future2, ResourceAlreadyExistsException.class);
assertBusy(() -> {

@ -33,8 +33,8 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
protected PersistentTaskResponse createTestInstance() {
if (randomBoolean()) {
return new PersistentTaskResponse(
new PersistentTask<PersistentTaskRequest>(UUIDs.base64UUID(), randomAsciiOfLength(10),
new TestPersistentTasksPlugin.TestRequest("test"),
new PersistentTask<PersistentTaskParams>(UUIDs.base64UUID(), randomAsciiOfLength(10),
new TestPersistentTasksPlugin.TestParams("test"),
randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
} else {
return new PersistentTaskResponse(null);
@ -49,7 +49,8 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestRequest::new)
new NamedWriteableRegistry.Entry(PersistentTaskParams.class,
TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestParams::new)
));
}
}

@ -32,12 +32,13 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import java.io.IOException;
import java.util.ArrayList;
@ -48,6 +49,10 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -73,12 +78,18 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
public void testStartTask() throws Exception {
ClusterService clusterService = createClusterService();
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestRequest> action = mock(PersistentTasksExecutor.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
when(action.getTaskName()).thenReturn("test");
int nonLocalNodesCount = randomInt(10);
// need to account for 5 original tasks on each node and their relocations
for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) {
TaskId parentId = new TaskId("cluster", i);
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(
new TestPersistentTasksPlugin.TestTask(i, "persistent", "test", "", parentId));
}
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
int nonLocalNodesCount = randomInt(10);
MockExecutor executor = new MockExecutor();
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
registry, new TaskManager(Settings.EMPTY), executor);
@ -90,11 +101,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.addTask(UUIDs.base64UUID(), "test_action", new TestRequest("other_" + i),
tasks.addTask(UUIDs.base64UUID(), "test_action", new TestParams("other_" + i),
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
if (added == false && randomBoolean()) {
added = true;
tasks.addTask(UUIDs.base64UUID(), "test", new TestRequest("this_param"),
tasks.addTask(UUIDs.base64UUID(), "test", new TestParams("this_param"),
new Assignment("this_node", "test assignment on this node"));
}
}
@ -115,7 +126,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Add task on some other node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "some_other_node");
newClusterState = addTask(state, "test", null, "some_other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action wasn't called again
@ -123,7 +134,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Start another task on this node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest("this_param"), "this_node");
newClusterState = addTask(state, "test", new TestParams("this_param"), "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action was called this time
@ -138,7 +149,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Add task on some other node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "some_other_node");
newClusterState = addTask(state, "test", null, "some_other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action wasn't called again
@ -176,9 +187,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
fail("Shouldn't be called during Cluster State cancellation");
}
};
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestRequest> action = mock(PersistentTasksExecutor.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
when(action.getTaskName()).thenReturn("test");
when(action.createTask(anyLong(), anyString(), anyString(), any(), any()))
.thenReturn(new TestPersistentTasksPlugin.TestTask(1, "persistent", "test", "", new TaskId("cluster", 1)));
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
int nonLocalNodesCount = randomInt(10);
@ -193,7 +206,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
ClusterState newClusterState = state;
// Allocate first task
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "this_node");
newClusterState = addTask(state, "test", null, "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Check the the task is know to the task manager
@ -236,12 +249,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
private <Request extends PersistentTaskRequest> ClusterState addTask(ClusterState state, String action, Request request,
private <Params extends PersistentTaskParams> ClusterState addTask(ClusterState state, String action, Params params,
String node) {
PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE,
builder.addTask(UUIDs.base64UUID(), action, request, new Assignment(node, "test assignment")).build())).build();
builder.addTask(UUIDs.base64UUID(), action, params, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState reallocateTask(ClusterState state, String taskId, String node) {
@ -261,12 +274,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
private class Execution {
private final PersistentTaskRequest request;
private final PersistentTaskParams params;
private final AllocatedPersistentTask task;
private final PersistentTasksExecutor<?> holder;
Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) {
this.request = request;
Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) {
this.params = params;
this.task = task;
this.holder = holder;
}
@ -280,9 +293,9 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
@Override
public <Request extends PersistentTaskRequest> void executeTask(Request request, AllocatedPersistentTask task,
PersistentTasksExecutor<Request> action) {
executions.add(new Execution(request, task, action));
public <Params extends PersistentTaskParams> void executeTask(Params params, AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
executions.add(new Execution(params, task, executor));
}
public Execution get(int i) {

@ -21,9 +21,9 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.persistent.CreatePersistentTaskAction.Request;
import org.elasticsearch.persistent.StartPersistentTaskAction.Request;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.test.AbstractStreamableTestCase;
import java.util.Collections;
@ -32,17 +32,19 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas
@Override
protected Request createTestInstance() {
TestRequest testRequest = new TestRequest();
TestParams testParams;
if (randomBoolean()) {
testRequest.setTestParam(randomAlphaOfLengthBetween(1, 20));
testParams = new TestParams();
if (randomBoolean()) {
testParams.setTestParam(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
testRequest.setParentTask(randomAlphaOfLengthBetween(1, 20), randomLong());
testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
testRequest.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
} else {
testParams = null;
}
return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), new TestRequest());
return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams);
}
@Override
@ -53,7 +55,7 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new)
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new)
));
}
}

@ -22,7 +22,6 @@ package org.elasticsearch.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
@ -92,7 +91,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
@ -118,7 +117,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
new NamedWriteableRegistry.Entry(Task.Status.class,
PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new),
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE,
@ -134,16 +133,16 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
return Arrays.asList(
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksCustomMetaData.TYPE),
PersistentTasksCustomMetaData::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestRequest::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestParams::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
);
}
public static class TestRequest extends PersistentTaskRequest {
public static class TestParams implements PersistentTaskParams {
public static final ConstructingObjectParser<TestRequest, Void> REQUEST_PARSER =
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new TestRequest((String) args[0]));
public static final ConstructingObjectParser<TestParams, Void> REQUEST_PARSER =
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new TestParams((String) args[0]));
static {
REQUEST_PARSER.declareString(constructorArg(), new ParseField("param"));
@ -155,21 +154,18 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
private String testParam = null;
public TestRequest() {
public TestParams() {
}
public TestRequest(String testParam) {
public TestParams(String testParam) {
this.testParam = testParam;
}
public TestRequest(StreamInput in) throws IOException {
readFrom(in);
}
@Override
public ActionRequestValidationException validate() {
return null;
public TestParams(StreamInput in) throws IOException {
executorNodeAttr = in.readOptionalString();
responseNode = in.readOptionalString();
testParam = in.readOptionalString();
}
@Override
@ -195,20 +191,11 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(executorNodeAttr);
out.writeOptionalString(responseNode);
out.writeOptionalString(testParam);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
executorNodeAttr = in.readOptionalString();
responseNode = in.readOptionalString();
testParam = in.readOptionalString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -217,7 +204,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
return builder;
}
public static TestRequest fromXContent(XContentParser parser) throws IOException {
public static TestParams fromXContent(XContentParser parser) throws IOException {
return REQUEST_PARSER.parse(parser, null);
}
@ -225,7 +212,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestRequest that = (TestRequest) o;
TestParams that = (TestParams) o;
return Objects.equals(executorNodeAttr, that.executorNodeAttr) &&
Objects.equals(responseNode, that.responseNode) &&
Objects.equals(testParam, that.testParam);
@ -235,11 +222,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public int hashCode() {
return Objects.hash(executorNodeAttr, responseNode, testParam);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new TestTask(id, type, action, getDescription(), parentTaskId);
}
}
public static class Status implements Task.Status {
@ -312,7 +294,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
}
public static class TestPersistentTasksExecutor extends PersistentTasksExecutor<TestRequest> {
public static class TestPersistentTasksExecutor extends PersistentTasksExecutor<TestParams> {
public static final String NAME = "cluster:admin/persistent/test";
private final ClusterService clusterService;
@ -323,12 +305,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
}
@Override
public Assignment getAssignment(TestRequest request, ClusterState clusterState) {
if (request.getExecutorNodeAttr() == null) {
return super.getAssignment(request, clusterState);
public Assignment getAssignment(TestParams params, ClusterState clusterState) {
if (params == null || params.getExecutorNodeAttr() == null) {
return super.getAssignment(params, clusterState);
} else {
DiscoveryNode executorNode = selectLeastLoadedNode(clusterState,
discoveryNode -> request.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr")));
discoveryNode -> params.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr")));
if (executorNode != null) {
return new Assignment(executorNode.getId(), "test assignment");
} else {
@ -339,7 +321,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, TestRequest request) {
protected void nodeOperation(AllocatedPersistentTask task, TestParams params) {
logger.info("started node operation for the task {}", task);
try {
TestTask testTask = (TestTask) task;
@ -399,6 +381,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
task.markAsFailed(e);
}
}
@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<TestParams> task) {
return new TestTask(id, type, action, getDescription(task), parentTaskId);
}
}
public static class TestTaskAction extends Action<TestTasksRequest, TestTasksResponse, TestTasksRequestBuilder> {