Persistent Tasks: refactor PersistentTasksService to use ActionListener (#937)

PersistentTasksService methods are not using ActionListener<PersistentTask<?>> instead of PersistentTaskOperationListener.
This commit is contained in:
Igor Motov 2017-04-04 13:56:22 -04:00 committed by Martijn van Groningen
parent 97822dbea3
commit 5a8512bf4e
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
18 changed files with 183 additions and 541 deletions

View File

@ -70,7 +70,7 @@ public class AllocatedPersistentTask extends CancellableTask {
* *
* This doesn't affect the status of this allocated task. * This doesn't affect the status of this allocated task.
*/ */
public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) { public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener); persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
} }

View File

@ -22,7 +22,6 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -37,8 +36,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -48,7 +47,7 @@ import java.util.Objects;
* removed from the cluster state in case of successful completion or restarted on some other node in case of failure. * removed from the cluster state in case of successful completion or restarted on some other node in case of failure.
*/ */
public class CompletionPersistentTaskAction extends Action<CompletionPersistentTaskAction.Request, public class CompletionPersistentTaskAction extends Action<CompletionPersistentTaskAction.Request,
CompletionPersistentTaskAction.Response, PersistentTaskResponse,
CompletionPersistentTaskAction.RequestBuilder> { CompletionPersistentTaskAction.RequestBuilder> {
public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction(); public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
@ -64,8 +63,8 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
} }
@Override @Override
public Response newResponse() { public PersistentTaskResponse newResponse() {
return new Response(); return new PersistentTaskResponse();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
@ -117,49 +116,15 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
} }
} }
public static class Response extends AcknowledgedResponse {
public Response() {
super();
}
public Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CompletionPersistentTaskAction.Request, public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CompletionPersistentTaskAction.Request,
CompletionPersistentTaskAction.Response, CompletionPersistentTaskAction.RequestBuilder> { PersistentTaskResponse, CompletionPersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, CompletionPersistentTaskAction action) { protected RequestBuilder(ElasticsearchClient client, CompletionPersistentTaskAction action) {
super(client, action, new Request()); super(client, action, new Request());
} }
} }
public static class TransportAction extends TransportMasterNodeAction<Request, Response> { public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
private final PersistentTasksClusterService persistentTasksClusterService; private final PersistentTasksClusterService persistentTasksClusterService;
@ -179,8 +144,8 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
} }
@Override @Override
protected Response newResponse() { protected PersistentTaskResponse newResponse() {
return new Response(); return new PersistentTaskResponse();
} }
@Override @Override
@ -190,11 +155,13 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
} }
@Override @Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) { protected final void masterOperation(final Request request, ClusterState state,
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, new ActionListener<Empty>() { final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception,
new ActionListener<PersistentTask<?>>() {
@Override @Override
public void onResponse(Empty empty) { public void onResponse(PersistentTask<?> task) {
listener.onResponse(newResponse()); listener.onResponse(new PersistentTaskResponse(task));
} }
@Override @Override

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -194,10 +195,11 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
protected final void masterOperation(final Request request, ClusterState state, protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) { final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.action, request.request, persistentTasksClusterService.createPersistentTask(request.action, request.request,
new ActionListener<Long>() { new ActionListener<PersistentTask<?>>() {
@Override @Override
public void onResponse(Long newTaskId) { public void onResponse(PersistentTask<?> task) {
listener.onResponse(new PersistentTaskResponse(newTaskId)); listener.onResponse(new PersistentTaskResponse(task));
} }
@Override @Override

View File

@ -21,6 +21,7 @@ package org.elasticsearch.persistent;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -29,30 +30,30 @@ import java.util.Objects;
* Response upon a successful start or an persistent task * Response upon a successful start or an persistent task
*/ */
public class PersistentTaskResponse extends ActionResponse { public class PersistentTaskResponse extends ActionResponse {
private long taskId; private PersistentTask<?> task;
public PersistentTaskResponse() { public PersistentTaskResponse() {
super(); super();
} }
public PersistentTaskResponse(long taskId) { public PersistentTaskResponse(PersistentTask<?> task) {
this.taskId = taskId; this.task = task;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
taskId = in.readLong(); task = in.readOptionalWriteable(PersistentTask::new);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(taskId); out.writeOptionalWriteable(task);
} }
public long getTaskId() { public PersistentTask<?> getTask() {
return taskId; return task;
} }
@Override @Override
@ -60,11 +61,11 @@ public class PersistentTaskResponse extends ActionResponse {
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
PersistentTaskResponse that = (PersistentTaskResponse) o; PersistentTaskResponse that = (PersistentTaskResponse) o;
return taskId == that.taskId; return Objects.equals(task, that.task);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(taskId); return Objects.hash(task);
} }
} }

View File

@ -62,7 +62,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param listener the listener that will be called when task is started * @param listener the listener that will be called when task is started
*/ */
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request, public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request,
ActionListener<Long> listener) { ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
@ -77,10 +77,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements
listener.onFailure(e); listener.onFailure(e);
} }
@SuppressWarnings("unchecked")
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse( PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
((PersistentTasksCustomMetaData) newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)).getCurrentId()); if (tasks != null) {
listener.onResponse(tasks.getTask(tasks.getCurrentId()));
} else {
listener.onResponse(null);
}
} }
}); });
} }
@ -93,7 +98,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param failure the reason for restarting the task or null if the task completed successfully * @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void completePersistentTask(long id, Exception failure, ActionListener<Empty> listener) { public void completePersistentTask(long id, Exception failure, ActionListener<PersistentTask<?>> listener) {
final String source; final String source;
if (failure != null) { if (failure != null) {
logger.warn("persistent task " + id + " failed", failure); logger.warn("persistent task " + id + " failed", failure);
@ -122,38 +127,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE); // Using old state since in the new state the task is already gone
} listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(oldState, id));
});
}
/**
* Switches the persistent task from stopped to started mode
*
* @param id the id of a persistent task
* @param listener the listener that will be called when task is removed
*/
public void startPersistentTask(long id, ActionListener<Empty> listener) {
clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress
.assignTask(id, (action, request) -> getAssignement(action, currentState, request)));
} else {
throw new ResourceNotFoundException("the task with id {} doesn't exist", id);
}
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE);
} }
}); });
} }
@ -164,7 +139,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param id the id of a persistent task * @param id the id of a persistent task
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void removePersistentTask(long id, ActionListener<Empty> listener) { public void removePersistentTask(long id, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
@ -183,7 +158,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE); // Using old state since in the new state the task is already gone
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(oldState, id));
} }
}); });
} }
@ -196,7 +172,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param status new status * @param status new status
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<Empty> listener) { public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
@ -220,7 +196,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE); listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, id));
} }
}); });
} }

View File

@ -281,7 +281,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private PersistentTask(StreamInput in) throws IOException { public PersistentTask(StreamInput in) throws IOException {
id = in.readLong(); id = in.readLong();
allocationId = in.readLong(); allocationId = in.readLong();
taskName = in.readString(); taskName = in.readString();

View File

@ -24,9 +24,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -39,14 +37,11 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
private final String executor; private final String executor;
private final String taskName; private final String taskName;
private final PersistentTasksService persistentTasksService;
protected PersistentTasksExecutor(Settings settings, String taskName, PersistentTasksService persistentTasksService, protected PersistentTasksExecutor(Settings settings, String taskName, String executor) {
String executor) {
super(settings); super(settings);
this.taskName = taskName; this.taskName = taskName;
this.executor = executor; this.executor = executor;
this.persistentTasksService = persistentTasksService;
} }
public String getTaskName() { public String getTaskName() {

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -36,7 +37,6 @@ import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException; import java.io.IOException;
@ -160,10 +160,10 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId); AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
if (task != null) { if (task != null) {
if (task.markAsCancelled()) { if (task.markAsCancelled()) {
persistentTasksService.sendCancellation(task.getId(), new PersistentTaskOperationListener() { persistentTasksService.sendCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
@Override @Override
public void onResponse(long taskId) { public void onResponse(CancelTasksResponse cancelTasksResponse) {
logger.trace("Persistent task with id {} was cancelled", taskId); logger.trace("Persistent task with id {} was cancelled", task.getId());
} }
@ -253,7 +253,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
} }
} }
private class PublishedResponseListener implements PersistentTaskOperationListener { private class PublishedResponseListener implements ActionListener<PersistentTask<?>> {
private final AllocatedPersistentTask task; private final AllocatedPersistentTask task;
PublishedResponseListener(final AllocatedPersistentTask task) { PublishedResponseListener(final AllocatedPersistentTask task) {
@ -262,8 +262,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
@Override @Override
public void onResponse(long taskId) { public void onResponse(PersistentTask<?> persistentTask) {
logger.trace("notification for task {} was successful", task.getPersistentTaskId()); logger.trace("notification for task {} was successful", task.getId());
if (task.markAsNotified() == false) { if (task.markAsNotified() == false) {
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getPersistentTaskId(), task.getState()); logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getPersistentTaskId(), task.getState());
} }

View File

@ -20,6 +20,7 @@ package org.elasticsearch.persistent;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -54,16 +55,15 @@ public class PersistentTasksService extends AbstractComponent {
} }
/** /**
* Creates the specified persistent action. The action is started unless the stopped parameter is equal to true. * Creates the specified persistent task and attempts to assign it to a node.
* If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion.
* Otherwise it will remain there in the stopped state.
*/ */
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request, @SuppressWarnings("unchecked")
PersistentTaskOperationListener listener) { public <Request extends PersistentTaskRequest> void startPersistentTask(String taskName, Request request,
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request); ActionListener<PersistentTask<Request>> listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(taskName, request);
try { try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse(o.getTaskId()), listener::onFailure)); o -> listener.onResponse((PersistentTask<Request>) o.getTask()), listener::onFailure));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -71,13 +71,12 @@ public class PersistentTasksService extends AbstractComponent {
/** /**
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure * Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
*
*/ */
public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) { public void sendCompletionNotification(long taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure); CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
try { try {
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, ActionListener.wrap(o -> listener.onResponse(taskId), client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
listener::onFailure)); ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -86,14 +85,13 @@ public class PersistentTasksService extends AbstractComponent {
/** /**
* Cancels the persistent task. * Cancels the persistent task.
*/ */
public void sendCancellation(long taskId, PersistentTaskOperationListener listener) { void sendCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
DiscoveryNode localNode = clusterService.localNode(); DiscoveryNode localNode = clusterService.localNode();
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId)); cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
cancelTasksRequest.setReason("persistent action was removed"); cancelTasksRequest.setReason("persistent action was removed");
try { try {
client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(o -> listener.onResponse(taskId), client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
listener::onFailure));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
@ -101,28 +99,28 @@ public class PersistentTasksService extends AbstractComponent {
/** /**
* Updates status of the persistent task. * Updates status of the persistent task.
* * <p>
* Persistent task implementers shouldn't call this method directly and use * Persistent task implementers shouldn't call this method directly and use
* {@link AllocatedPersistentTask#updatePersistentStatus} instead * {@link AllocatedPersistentTask#updatePersistentStatus} instead
*/ */
void updateStatus(long taskId, long allocationId, Task.Status status, PersistentTaskOperationListener listener) { void updateStatus(long taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request updateStatusRequest = UpdatePersistentTaskStatusAction.Request updateStatusRequest =
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status); new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
try { try {
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap( client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap(
o -> listener.onResponse(taskId), listener::onFailure)); o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
} }
/** /**
* Removes a persistent task * Cancels if needed and removes a persistent task
*/ */
public void removeTask(long taskId, PersistentTaskOperationListener listener) { public void cancelPersistentTask(long taskId, ActionListener<PersistentTask<?>> listener) {
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId); RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
try { try {
client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(taskId), client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()),
listener::onFailure)); listener::onFailure));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
@ -134,15 +132,15 @@ public class PersistentTasksService extends AbstractComponent {
* waits of it. * waits of it.
*/ */
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout, public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
WaitForPersistentTaskStatusListener listener) { WaitForPersistentTaskStatusListener<?> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) { if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {
listener.onResponse(taskId); listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId));
} else { } else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override @Override
public void onNewClusterState(ClusterState state) { public void onNewClusterState(ClusterState state) {
listener.onResponse(taskId); listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(state, taskId));
} }
@Override @Override
@ -158,15 +156,10 @@ public class PersistentTasksService extends AbstractComponent {
} }
} }
public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener { public interface WaitForPersistentTaskStatusListener<Request extends PersistentTaskRequest>
extends ActionListener<PersistentTask<Request>> {
default void onTimeout(TimeValue timeout) { default void onTimeout(TimeValue timeout) {
onFailure(new IllegalStateException("timed out after " + timeout)); onFailure(new IllegalStateException("timed out after " + timeout));
} }
} }
public interface PersistentTaskOperationListener {
void onResponse(long taskId);
void onFailure(Exception e);
}
} }

View File

@ -39,12 +39,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
public class RemovePersistentTaskAction extends Action<RemovePersistentTaskAction.Request, public class RemovePersistentTaskAction extends Action<RemovePersistentTaskAction.Request,
RemovePersistentTaskAction.Response, PersistentTaskResponse,
RemovePersistentTaskAction.RequestBuilder> { RemovePersistentTaskAction.RequestBuilder> {
public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction(); public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction();
@ -60,8 +61,8 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
} }
@Override @Override
public Response newResponse() { public PersistentTaskResponse newResponse() {
return new Response(); return new PersistentTaskResponse();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
@ -111,42 +112,8 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
} }
} }
public static class Response extends AcknowledgedResponse {
public Response() {
super();
}
public Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<RemovePersistentTaskAction.Request, public static class RequestBuilder extends MasterNodeOperationRequestBuilder<RemovePersistentTaskAction.Request,
RemovePersistentTaskAction.Response, RemovePersistentTaskAction.RequestBuilder> { PersistentTaskResponse, RemovePersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, RemovePersistentTaskAction action) { protected RequestBuilder(ElasticsearchClient client, RemovePersistentTaskAction action) {
super(client, action, new Request()); super(client, action, new Request());
@ -159,7 +126,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
} }
public static class TransportAction extends TransportMasterNodeAction<Request, Response> { public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
private final PersistentTasksClusterService persistentTasksClusterService; private final PersistentTasksClusterService persistentTasksClusterService;
@ -179,8 +146,8 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
} }
@Override @Override
protected Response newResponse() { protected PersistentTaskResponse newResponse() {
return new Response(); return new PersistentTaskResponse();
} }
@Override @Override
@ -190,11 +157,12 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
} }
@Override @Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) { protected final void masterOperation(final Request request, ClusterState state,
persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener<Empty>() { final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener<PersistentTask<?>>() {
@Override @Override
public void onResponse(Empty empty) { public void onResponse(PersistentTask<?> task) {
listener.onResponse(new Response(true)); listener.onResponse(new PersistentTaskResponse(task));
} }
@Override @Override

View File

@ -1,212 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
/**
* This action can be used to start a persistent task previously created using {@link CreatePersistentTaskAction}
*/
public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.Request,
StartPersistentTaskAction.Response,
StartPersistentTaskAction.RequestBuilder> {
public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/start";
private StartPersistentTaskAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeRequest<Request> {
private long taskId;
public Request() {
}
public Request(long taskId) {
this.taskId = taskId;
}
public void setTaskId(long taskId) {
this.taskId = taskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return taskId == request.taskId;
}
@Override
public int hashCode() {
return Objects.hash(taskId);
}
}
public static class Response extends AcknowledgedResponse {
public Response() {
super();
}
public Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<StartPersistentTaskAction.Request,
StartPersistentTaskAction.Response, StartPersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) {
super(client, action, new Request());
}
public final RequestBuilder setTaskId(long taskId) {
request.setTaskId(taskId);
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final PersistentTasksClusterService persistentTasksClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTasksClusterService persistentTasksClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTasksClusterService = persistentTasksClusterService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTasksClusterService.startPersistentTask(request.taskId, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -40,12 +40,13 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request, public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request,
UpdatePersistentTaskStatusAction.Response, PersistentTaskResponse,
UpdatePersistentTaskStatusAction.RequestBuilder> { UpdatePersistentTaskStatusAction.RequestBuilder> {
public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction(); public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction();
@ -61,8 +62,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
} }
@Override @Override
public Response newResponse() { public PersistentTaskResponse newResponse() {
return new Response(); return new PersistentTaskResponse();
} }
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
@ -129,42 +130,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
} }
} }
public static class Response extends AcknowledgedResponse {
public Response() {
super();
}
public Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<UpdatePersistentTaskStatusAction.Request, public static class RequestBuilder extends MasterNodeOperationRequestBuilder<UpdatePersistentTaskStatusAction.Request,
UpdatePersistentTaskStatusAction.Response, UpdatePersistentTaskStatusAction.RequestBuilder> { PersistentTaskResponse, UpdatePersistentTaskStatusAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, UpdatePersistentTaskStatusAction action) { protected RequestBuilder(ElasticsearchClient client, UpdatePersistentTaskStatusAction action) {
super(client, action, new Request()); super(client, action, new Request());
@ -182,7 +149,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
} }
public static class TransportAction extends TransportMasterNodeAction<Request, Response> { public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
private final PersistentTasksClusterService persistentTasksClusterService; private final PersistentTasksClusterService persistentTasksClusterService;
@ -202,8 +169,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
} }
@Override @Override
protected Response newResponse() { protected PersistentTaskResponse newResponse() {
return new Response(); return new PersistentTaskResponse();
} }
@Override @Override
@ -213,12 +180,13 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
} }
@Override @Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) { protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status, persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status,
new ActionListener<Empty>() { new ActionListener<PersistentTask<?>>() {
@Override @Override
public void onResponse(Empty empty) { public void onResponse(PersistentTask<?> task) {
listener.onResponse(new Response(true)); listener.onResponse(new PersistentTaskResponse(task));
} }
@Override @Override

View File

@ -1,35 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;
import org.elasticsearch.persistent.RemovePersistentTaskAction.Response;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class CancelPersistentTaskResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -18,10 +18,10 @@
*/ */
package org.elasticsearch.persistent; package org.elasticsearch.persistent;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.persistent.PersistentTasksExecutorIT.PersistentTaskOperationFuture;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
@ -58,16 +58,16 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
int numberOfTasks = randomIntBetween(1, 10); int numberOfTasks = randomIntBetween(1, 10);
long[] taskIds = new long[numberOfTasks]; long[] taskIds = new long[numberOfTasks];
List<PersistentTaskOperationFuture> futures = new ArrayList<>(numberOfTasks); List<PlainActionFuture<PersistentTask<TestRequest>>> futures = new ArrayList<>(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
futures.add(future); futures.add(future);
service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); service.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
} }
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {
taskIds[i] = futures.get(i).get(); taskIds[i] = futures.get(i).get().getId();
} }
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()

View File

@ -27,6 +27,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
@ -66,18 +67,16 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertNoRunningTasks(); assertNoRunningTasks();
} }
public static class PersistentTaskOperationFuture extends PlainActionFuture<Long> implements WaitForPersistentTaskStatusListener { public static class WaitForPersistentTaskStatusFuture<Request extends PersistentTaskRequest>
@Override extends PlainActionFuture<PersistentTask<Request>>
public void onResponse(long taskId) { implements WaitForPersistentTaskStatusListener<Request> {
set(taskId);
}
} }
public void testPersistentActionFailure() throws Exception { public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get(); long taskId = future.get().getId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
@ -105,9 +104,9 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionCompletion() throws Exception { public void testPersistentActionCompletion() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get(); long taskId = future.get().getId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
@ -124,11 +123,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionWithNoAvailableNode() throws Exception { public void testPersistentActionWithNoAvailableNode() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
TestRequest testRequest = new TestRequest("Blah"); TestRequest testRequest = new TestRequest("Blah");
testRequest.setExecutorNodeAttr("test"); testRequest.setExecutorNodeAttr("test");
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, testRequest, future); persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, testRequest, future);
long taskId = future.get(); long taskId = future.get().getId();
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
String newNode = internalCluster().startNode(nodeSettings); String newNode = internalCluster().startNode(nodeSettings);
@ -153,16 +152,16 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
}); });
// Remove the persistent task // Remove the persistent task
PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture(); PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
persistentTasksService.removeTask(taskId, removeFuture); persistentTasksService.cancelPersistentTask(taskId, removeFuture);
assertEquals(removeFuture.get(), (Long) taskId); assertEquals(removeFuture.get().getId(), taskId);
} }
public void testPersistentActionStatusUpdate() throws Exception { public void testPersistentActionStatusUpdate() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get(); long taskId = future.get().getId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
@ -185,27 +184,27 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
int finalI = i; int finalI = i;
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture(); WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>();
persistentTasksService.waitForPersistentTaskStatus(taskId, persistentTasksService.waitForPersistentTaskStatus(taskId,
task -> task != null && task.isCurrentStatus() && task.getStatus().toString() != null && task -> task != null && task.isCurrentStatus() && task.getStatus().toString() != null &&
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
TimeValue.timeValueSeconds(10), future1); TimeValue.timeValueSeconds(10), future1);
assertThat(future1.get(), equalTo(taskId)); assertThat(future1.get().getId(), equalTo(taskId));
} }
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture(); WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>();
persistentTasksService.waitForPersistentTaskStatus(taskId, persistentTasksService.waitForPersistentTaskStatus(taskId,
task -> false, TimeValue.timeValueMillis(10), future1); task -> false, TimeValue.timeValueMillis(10), future1);
assertThrows(future1, IllegalStateException.class, "timed out after 10ms"); assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
PersistentTaskOperationFuture failedUpdateFuture = new PersistentTaskOperationFuture(); PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
persistentTasksService.updateStatus(taskId, -1, new Status("should fail"), failedUpdateFuture); persistentTasksService.updateStatus(taskId, -1, new Status("should fail"), failedUpdateFuture);
assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId + assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId +
" and allocation id -1 doesn't exist"); " and allocation id -1 doesn't exist");
// Wait for the task to disappear // Wait for the task to disappear
PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture(); WaitForPersistentTaskStatusFuture<?> future2 = new WaitForPersistentTaskStatusFuture<>();
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2); persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
logger.info("Completing the running task"); logger.info("Completing the running task");
@ -213,7 +212,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
assertThat(future2.get(), equalTo(taskId)); assertThat(future2.get(), nullValue());
} }

View File

@ -18,17 +18,37 @@
*/ */
package org.elasticsearch.persistent; package org.elasticsearch.persistent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.Collections;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
public class PersistentTasksExecutorResponseTests extends AbstractStreamableTestCase<PersistentTaskResponse> { public class PersistentTasksExecutorResponseTests extends AbstractStreamableTestCase<PersistentTaskResponse> {
@Override @Override
protected PersistentTaskResponse createTestInstance() { protected PersistentTaskResponse createTestInstance() {
return new PersistentTaskResponse(randomLong()); if (randomBoolean()) {
return new PersistentTaskResponse(
new PersistentTask<PersistentTaskRequest>(randomLong(), randomAsciiOfLength(10),
new TestPersistentTasksPlugin.TestRequest("test"),
PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
} else {
return new PersistentTaskResponse(null);
}
} }
@Override @Override
protected PersistentTaskResponse createBlankInstance() { protected PersistentTaskResponse createBlankInstance() {
return new PersistentTaskResponse(); return new PersistentTaskResponse();
} }
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestRequest::new)
));
}
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.persistent;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -35,8 +36,8 @@ import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.persistent.PersistentTasksService.PersistentTaskOperationListener;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
import java.io.IOException; import java.io.IOException;
@ -163,10 +164,10 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
public void testTaskCancellation() { public void testTaskCancellation() {
ClusterService clusterService = createClusterService(); ClusterService clusterService = createClusterService();
AtomicLong capturedTaskId = new AtomicLong(); AtomicLong capturedTaskId = new AtomicLong();
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>(); AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
@Override @Override
public void sendCancellation(long taskId, PersistentTaskOperationListener listener) { public void sendCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
capturedTaskId.set(taskId); capturedTaskId.set(taskId);
capturedListener.set(listener); capturedListener.set(listener);
} }
@ -217,7 +218,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// That should trigger cancellation request // That should trigger cancellation request
assertThat(capturedTaskId.get(), equalTo(localId)); assertThat(capturedTaskId.get(), equalTo(localId));
// Notify successful cancellation // Notify successful cancellation
capturedListener.get().onResponse(localId); capturedListener.get().onResponse(new CancelTasksResponse());
// finish or fail task // finish or fail task
if (randomBoolean()) { if (randomBoolean()) {
@ -240,11 +241,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
ClusterService clusterService = createClusterService(); ClusterService clusterService = createClusterService();
AtomicLong capturedTaskId = new AtomicLong(-1L); AtomicLong capturedTaskId = new AtomicLong(-1L);
AtomicReference<Exception> capturedException = new AtomicReference<>(); AtomicReference<Exception> capturedException = new AtomicReference<>();
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>(); AtomicReference<ActionListener<PersistentTask<?>>> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = PersistentTasksService persistentTasksService =
new PersistentTasksService(Settings.EMPTY, clusterService, null, null) { new PersistentTasksService(Settings.EMPTY, clusterService, null, null) {
@Override @Override
public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) { public void sendCompletionNotification(long taskId, Exception failure,
ActionListener<PersistentTask<?>> listener) {
capturedTaskId.set(taskId); capturedTaskId.set(taskId);
capturedException.set(failure); capturedException.set(failure);
capturedListener.set(listener); capturedListener.set(listener);
@ -297,7 +299,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
long id = taskManager.getTasks().values().iterator().next().getParentTaskId().getId(); long id = taskManager.getTasks().values().iterator().next().getParentTaskId().getId();
// This time acknowledge notification // This time acknowledge notification
capturedListener.get().onResponse(id); capturedListener.get().onResponse(
new PersistentTask<>(1, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, new TestRequest(), null));
// Reallocate failed task to another node // Reallocate failed task to another node
state = newClusterState; state = newClusterState;

View File

@ -63,7 +63,7 @@ import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.security.InternalClient; import org.elasticsearch.security.InternalClient;
import java.io.IOException; import java.io.IOException;
@ -94,7 +94,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
return Arrays.asList( return Arrays.asList(
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class), new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class) new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
@ -107,8 +106,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
NamedXContentRegistry xContentRegistry) { NamedXContentRegistry xContentRegistry) {
InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client); InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client);
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient);
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService, TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService);
clusterService);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
Collections.singletonList(testPersistentAction)); Collections.singletonList(testPersistentAction));
return Arrays.asList( return Arrays.asList(
@ -320,9 +318,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public static final String NAME = "cluster:admin/persistent/test"; public static final String NAME = "cluster:admin/persistent/test";
private final ClusterService clusterService; private final ClusterService clusterService;
public TestPersistentTasksExecutor(Settings settings, PersistentTasksService persistentTasksService, public TestPersistentTasksExecutor(Settings settings, ClusterService clusterService) {
ClusterService clusterService) { super(settings, NAME, ThreadPool.Names.GENERIC);
super(settings, NAME, persistentTasksService, ThreadPool.Names.GENERIC);
this.clusterService = clusterService; this.clusterService = clusterService;
} }
@ -368,9 +365,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
Status status = new Status("phase " + phase.incrementAndGet()); Status status = new Status("phase " + phase.incrementAndGet());
logger.info("updating the task status to {}", status); logger.info("updating the task status to {}", status);
task.updatePersistentStatus(status, new PersistentTaskOperationListener() { task.updatePersistentStatus(status, new ActionListener<PersistentTask<?>>() {
@Override @Override
public void onResponse(long taskId) { public void onResponse(PersistentTask<?> persistentTask) {
logger.info("updating was successful"); logger.info("updating was successful");
latch.countDown(); latch.countDown();
} }