Rename methods in PersistentTasksService (#30837)

This commit renames methods in the PersistentTasksService, to 
make obvious that the methods send requests in order to change 
the state of persistent tasks. 

Relates to #29608.
This commit is contained in:
Tanguy Leroux 2018-05-30 09:20:14 +02:00 committed by GitHub
parent b5c997b10b
commit a0af0e7f1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 181 additions and 173 deletions

View File

@ -103,12 +103,16 @@ public class AllocatedPersistentTask extends CancellableTask {
} }
/** /**
* Waits for this persistent task to have the desired state. * Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly.
*
* @param predicate the persistent task predicate to evaluate
* @param timeout a timeout for waiting
* @param listener the callback listener
*/ */
public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate, public void waitForPersistentTask(final Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
@Nullable TimeValue timeout, final @Nullable TimeValue timeout,
PersistentTasksService.WaitForPersistentTaskStatusListener<?> listener) { final PersistentTasksService.WaitForPersistentTaskListener<?> listener) {
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener); persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener);
} }
final boolean isCompleted() { final boolean isCompleted() {
@ -143,7 +147,7 @@ public class AllocatedPersistentTask extends CancellableTask {
this.failure = failure; this.failure = failure;
if (prevState == State.STARTED) { if (prevState == State.STARTED) {
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId()); logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new persistentTasksService.sendCompletionRequest(getPersistentTaskId(), getAllocationId(), failure, new
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {

View File

@ -196,7 +196,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
AllocatedPersistentTask task = runningTasks.remove(allocationId); AllocatedPersistentTask task = runningTasks.remove(allocationId);
if (task.markAsCancelled()) { if (task.markAsCancelled()) {
// Cancel the local task using the task manager // Cancel the local task using the task manager
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() { String reason = "task has been removed, cancelling locally";
persistentTasksService.sendCancelRequest(task.getId(), reason, new ActionListener<CancelTasksResponse>() {
@Override @Override
public void onResponse(CancelTasksResponse cancelTasksResponse) { public void onResponse(CancelTasksResponse cancelTasksResponse) {
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(), logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(),

View File

@ -22,14 +22,12 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -37,20 +35,24 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.function.BiConsumer;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
* This service is used by persistent actions to propagate changes in the action state and notify about completion * This service is used by persistent tasks and allocated persistent tasks to communicate changes
* to the master node so that the master can update the cluster state and can track of the states
* of the persistent tasks.
*/ */
public class PersistentTasksService extends AbstractComponent { public class PersistentTasksService extends AbstractComponent {
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -63,92 +65,115 @@ public class PersistentTasksService extends AbstractComponent {
} }
/** /**
* Creates the specified persistent task and attempts to assign it to a node. * Notifies the master node to create new persistent task and to assign it to a node.
*/ */
public <Params extends PersistentTaskParams> void sendStartRequest(final String taskId,
final String taskName,
final @Nullable Params taskParams,
final ActionListener<PersistentTask<Params>> listener) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <Params extends PersistentTaskParams> void startPersistentTask(String taskId, String taskName, @Nullable Params params, final ActionListener<PersistentTask<?>> wrappedListener =
ActionListener<PersistentTask<Params>> listener) { ActionListener.wrap(t -> listener.onResponse((PersistentTask<Params>) t), listener::onFailure);
StartPersistentTaskAction.Request createPersistentActionRequest = StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams);
new StartPersistentTaskAction.Request(taskId, taskName, params); execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener);
try {
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, StartPersistentTaskAction.INSTANCE, createPersistentActionRequest,
ActionListener.wrap(o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
} }
/** /**
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure * Notifies the master node about the completion of a persistent task.
* <p>
* When {@code failure} is {@code null}, the persistent task is considered as successfully completed.
*/ */
public void sendCompletionNotification(String taskId, long allocationId, Exception failure, public void sendCompletionRequest(final String taskId,
ActionListener<PersistentTask<?>> listener) { final long taskAllocationId,
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure); final @Nullable Exception taskFailure,
try { final ActionListener<PersistentTask<?>> listener) {
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, CompletionPersistentTaskAction.INSTANCE, restartRequest, CompletionPersistentTaskAction.Request request = new CompletionPersistentTaskAction.Request(taskId, taskAllocationId, taskFailure);
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); execute(request, CompletionPersistentTaskAction.INSTANCE, listener);
} catch (Exception e) {
listener.onFailure(e);
}
} }
/** /**
* Cancels a locally running task using the task manager * Cancels a locally running task using the Task Manager API
*/ */
void sendTaskManagerCancellation(long taskId, ActionListener<CancelTasksResponse> listener) { void sendCancelRequest(final long taskId, final String reason, final ActionListener<CancelTasksResponse> listener) {
DiscoveryNode localNode = clusterService.localNode(); CancelTasksRequest request = new CancelTasksRequest();
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId));
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId)); request.setReason(reason);
cancelTasksRequest.setReason("persistent action was removed");
try { try {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), PERSISTENT_TASK_ORIGIN, cancelTasksRequest, listener, final ThreadContext threadContext = client.threadPool().getThreadContext();
client.admin().cluster()::cancelTasks); final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener));
}
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
} }
/** /**
* Updates status of the persistent task. * Notifies the master node that the state of a persistent task has changed.
* <p> * <p>
* Persistent task implementers shouldn't call this method directly and use * Persistent task implementers shouldn't call this method directly and use
* {@link AllocatedPersistentTask#updatePersistentStatus} instead * {@link AllocatedPersistentTask#updatePersistentStatus} instead
*/ */
void updateStatus(String taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) { void updateStatus(final String taskId,
UpdatePersistentTaskStatusAction.Request updateStatusRequest = final long taskAllocationID,
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status); final Task.Status status,
final ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, status);
execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener);
}
/**
* Notifies the master node to remove a persistent task from the cluster state
*/
public void sendRemoveRequest(final String taskId, final ActionListener<PersistentTask<?>> listener) {
RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId);
execute(request, RemovePersistentTaskAction.INSTANCE, listener);
}
/**
* Executes an asynchronous persistent task action using the client.
* <p>
* The origin is set in the context and the listener is wrapped to ensure the proper context is restored
*/
private <Req extends ActionRequest, Resp extends PersistentTaskResponse, Builder extends ActionRequestBuilder<Req, Resp, Builder>>
void execute(final Req request, final Action<Req, Resp, Builder> action, final ActionListener<PersistentTask<?>> listener) {
try { try {
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, final ThreadContext threadContext = client.threadPool().getThreadContext();
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
client.execute(action, request,
new ContextPreservingActionListener<>(supplier,
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)));
}
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }
} }
/** /**
* Cancels if needed and removes a persistent task * Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly.
*
* @param taskId the persistent task id
* @param predicate the persistent task predicate to evaluate
* @param timeout a timeout for waiting
* @param listener the callback listener
*/ */
public void cancelPersistentTask(String taskId, ActionListener<PersistentTask<?>> listener) { public void waitForPersistentTaskCondition(final String taskId,
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId); final Predicate<PersistentTask<?>> predicate,
try { final @Nullable TimeValue timeout,
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, RemovePersistentTaskAction.INSTANCE, removeRequest, final WaitForPersistentTaskListener<?> listener) {
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); final Predicate<ClusterState> clusterStatePredicate = clusterState ->
} catch (Exception e) { predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId));
listener.onFailure(e);
}
}
/** final ClusterStateObserver observer = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
* Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't final ClusterState clusterState = observer.setAndGetObservedState();
* waits of it. if (clusterStatePredicate.test(clusterState)) {
*/ listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId));
public void waitForPersistentTaskStatus(String taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
WaitForPersistentTaskStatusListener<?> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId));
} else { } else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override @Override
public void onNewClusterState(ClusterState state) { public void onNewClusterState(ClusterState state) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(state, taskId)); listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(state, taskId));
@ -163,18 +188,28 @@ public class PersistentTasksService extends AbstractComponent {
public void onTimeout(TimeValue timeout) { public void onTimeout(TimeValue timeout) {
listener.onTimeout(timeout); listener.onTimeout(timeout);
} }
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId))); }, clusterStatePredicate);
} }
} }
public void waitForPersistentTasksStatus(Predicate<PersistentTasksCustomMetaData> predicate, /**
@Nullable TimeValue timeout, ActionListener<Boolean> listener) { * Waits for persistent tasks to comply with a given predicate, then call back the listener accordingly.
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, *
logger, threadPool.getThreadContext()); * @param predicate the predicate to evaluate
if (predicate.test(stateObserver.setAndGetObservedState().metaData().custom(PersistentTasksCustomMetaData.TYPE))) { * @param timeout a timeout for waiting
* @param listener the callback listener
*/
public void waitForPersistentTasksCondition(final Predicate<PersistentTasksCustomMetaData> predicate,
final @Nullable TimeValue timeout,
final ActionListener<Boolean> listener) {
final Predicate<ClusterState> clusterStatePredicate = clusterState ->
predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
if (clusterStatePredicate.test(observer.setAndGetObservedState())) {
listener.onResponse(true); listener.onResponse(true);
} else { } else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override @Override
public void onNewClusterState(ClusterState state) { public void onNewClusterState(ClusterState state) {
listener.onResponse(true); listener.onResponse(true);
@ -187,45 +222,15 @@ public class PersistentTasksService extends AbstractComponent {
@Override @Override
public void onTimeout(TimeValue timeout) { public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("timed out after " + timeout)); listener.onFailure(new IllegalStateException("Timed out when waiting for persistent tasks after " + timeout));
} }
}, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)), timeout); }, clusterStatePredicate, timeout);
} }
} }
public interface WaitForPersistentTaskStatusListener<Params extends PersistentTaskParams> public interface WaitForPersistentTaskListener<P extends PersistentTaskParams> extends ActionListener<PersistentTask<P>> {
extends ActionListener<PersistentTask<Params>> {
default void onTimeout(TimeValue timeout) { default void onTimeout(TimeValue timeout) {
onFailure(new IllegalStateException("timed out after " + timeout)); onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout));
}
}
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
/**
* Executes a consumer after setting the origin and wrapping the listener so that the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse> void executeAsyncWithOrigin(
ThreadContext threadContext, String origin, Request request, ActionListener<Response> listener,
BiConsumer<Request, ActionListener<Response>> consumer) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
consumer.accept(request, new ContextPreservingActionListener<>(supplier, listener));
}
}
/**
* Executes an asynchronous action using the provided client. The origin is set in the context and the listener
* is wrapped to ensure the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void executeAsyncWithOrigin(
Client client, String origin, Action<Request, Response, RequestBuilder> action, Request request,
ActionListener<Response> listener) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener));
} }
} }
@ -234,5 +239,4 @@ public class PersistentTasksService extends AbstractComponent {
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext; return storedContext;
} }
} }

View File

@ -65,8 +65,7 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
futures.add(future); futures.add(future);
taskIds[i] = UUIDs.base64UUID(); taskIds[i] = UUIDs.base64UUID();
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), future);
future);
} }
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {

View File

@ -30,7 +30,7 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
@ -69,15 +69,15 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertNoRunningTasks(); assertNoRunningTasks();
} }
public static class WaitForPersistentTaskStatusFuture<Params extends PersistentTaskParams> public static class WaitForPersistentTaskFuture<Params extends PersistentTaskParams>
extends PlainActionFuture<PersistentTask<Params>> extends PlainActionFuture<PersistentTask<Params>>
implements WaitForPersistentTaskStatusListener<Params> { implements WaitForPersistentTaskListener<Params> {
} }
public void testPersistentActionFailure() throws Exception { public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
long allocationId = future.get().getAllocationId(); long allocationId = future.get().getAllocationId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
@ -108,7 +108,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID(); String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
long allocationId = future.get().getAllocationId(); long allocationId = future.get().getAllocationId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
@ -127,7 +127,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
logger.info("Simulating errant completion notification"); logger.info("Simulating errant completion notification");
//try sending completion request with incorrect allocation id //try sending completion request with incorrect allocation id
PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>();
persistentTasksService.sendCompletionNotification(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture); persistentTasksService.sendCompletionRequest(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture);
assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class); assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class);
// Make sure that the task is still running // Make sure that the task is still running
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
@ -142,7 +142,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
TestParams testParams = new TestParams("Blah"); TestParams testParams = new TestParams("Blah");
testParams.setExecutorNodeAttr("test"); testParams.setExecutorNodeAttr("test");
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
String taskId = future.get().getId(); String taskId = future.get().getId();
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
@ -169,14 +169,14 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
// Remove the persistent task // Remove the persistent task
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
persistentTasksService.cancelPersistentTask(taskId, removeFuture); persistentTasksService.sendRemoveRequest(taskId, removeFuture);
assertEquals(removeFuture.get().getId(), taskId); assertEquals(removeFuture.get().getId(), taskId);
} }
public void testPersistentActionStatusUpdate() throws Exception { public void testPersistentActionStatusUpdate() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
String taskId = future.get().getId(); String taskId = future.get().getId();
assertBusy(() -> { assertBusy(() -> {
@ -200,16 +200,16 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
int finalI = i; int finalI = i;
WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>(); WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
persistentTasksService.waitForPersistentTaskStatus(taskId, persistentTasksService.waitForPersistentTaskCondition(taskId,
task -> task != null && task.getStatus() != null && task.getStatus().toString() != null && task -> task != null && task.getStatus() != null && task.getStatus().toString() != null &&
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
TimeValue.timeValueSeconds(10), future1); TimeValue.timeValueSeconds(10), future1);
assertThat(future1.get().getId(), equalTo(taskId)); assertThat(future1.get().getId(), equalTo(taskId));
} }
WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>(); WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
persistentTasksService.waitForPersistentTaskStatus(taskId, persistentTasksService.waitForPersistentTaskCondition(taskId,
task -> false, TimeValue.timeValueMillis(10), future1); task -> false, TimeValue.timeValueMillis(10), future1);
assertThrows(future1, IllegalStateException.class, "timed out after 10ms"); assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
@ -220,8 +220,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
" and allocation id -2 doesn't exist"); " and allocation id -2 doesn't exist");
// Wait for the task to disappear // Wait for the task to disappear
WaitForPersistentTaskStatusFuture<?> future2 = new WaitForPersistentTaskStatusFuture<>(); WaitForPersistentTaskFuture<?> future2 = new WaitForPersistentTaskFuture<>();
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2); persistentTasksService.waitForPersistentTaskCondition(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
logger.info("Completing the running task"); logger.info("Completing the running task");
// Complete the running task and make sure it finishes properly // Complete the running task and make sure it finishes properly
@ -235,11 +235,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID(); String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
future.get(); future.get();
PlainActionFuture<PersistentTask<TestParams>> future2 = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestParams>> future2 = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2); persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2);
assertThrows(future2, ResourceAlreadyExistsException.class); assertThrows(future2, ResourceAlreadyExistsException.class);
assertBusy(() -> { assertBusy(() -> {

View File

@ -235,14 +235,14 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>(); AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
@Override @Override
public void sendTaskManagerCancellation(long taskId, ActionListener<CancelTasksResponse> listener) { void sendCancelRequest(final long taskId, final String reason, final ActionListener<CancelTasksResponse> listener) {
capturedTaskId.set(taskId); capturedTaskId.set(taskId);
capturedListener.set(listener); capturedListener.set(listener);
} }
@Override @Override
public void sendCompletionNotification(String taskId, long allocationId, Exception failure, public void sendCompletionRequest(final String taskId, final long taskAllocationId,
ActionListener<PersistentTask<?>> listener) { final Exception taskFailure, final ActionListener<PersistentTask<?>> listener) {
fail("Shouldn't be called during Cluster State cancellation"); fail("Shouldn't be called during Cluster State cancellation");
} }
}; };

View File

@ -71,7 +71,7 @@ public class EnableAssignmentDeciderIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(numberOfTasks); final CountDownLatch latch = new CountDownLatch(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(), service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(),
new ActionListener<PersistentTask<PersistentTaskParams>>() { new ActionListener<PersistentTask<PersistentTaskParams>>() {
@Override @Override
public void onResponse(PersistentTask<PersistentTaskParams> task) { public void onResponse(PersistentTask<PersistentTaskParams> task) {

View File

@ -315,7 +315,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks); PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask != null) { if (jobTask != null) {
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
persistentTasksService.cancelPersistentTask(jobTask.getId(), persistentTasksService.sendRemoveRequest(jobTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
@ -400,7 +400,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
// so wait for that to happen here. // so wait for that to happen here.
void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitForCloseRequest, CloseJobAction.Response response, void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitForCloseRequest, CloseJobAction.Response response,
ActionListener<CloseJobAction.Response> listener) { ActionListener<CloseJobAction.Response> listener) {
persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
for (String persistentTaskId : waitForCloseRequest.persistentTaskIds) { for (String persistentTaskId : waitForCloseRequest.persistentTaskIds) {
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false; return false;

View File

@ -90,7 +90,7 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
if (datafeedTask == null) { if (datafeedTask == null) {
listener.onResponse(true); listener.onResponse(true);
} else { } else {
persistentTasksService.cancelPersistentTask(datafeedTask.getId(), persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {

View File

@ -182,7 +182,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
if (jobTask == null) { if (jobTask == null) {
listener.onResponse(null); listener.onResponse(null);
} else { } else {
persistentTasksService.cancelPersistentTask(jobTask.getId(), persistentTasksService.sendRemoveRequest(jobTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {

View File

@ -465,7 +465,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
// Step 4. Start job task // Step 4. Start job task
ActionListener<PutJobAction.Response> establishedMemoryUpdateListener = ActionListener.wrap( ActionListener<PutJobAction.Response> establishedMemoryUpdateListener = ActionListener.wrap(
response -> persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(jobParams.getJobId()), response -> persistentTasksService.sendStartRequest(MlMetadata.jobTaskId(jobParams.getJobId()),
OpenJobAction.TASK_NAME, jobParams, finalListener), OpenJobAction.TASK_NAME, jobParams, finalListener),
listener::onFailure listener::onFailure
); );
@ -518,8 +518,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<OpenJobAction.Response> listener) { private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<OpenJobAction.Response> listener) {
JobPredicate predicate = new JobPredicate(); JobPredicate predicate = new JobPredicate();
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.getTimeout(), persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, jobParams.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskStatusListener<OpenJobAction.JobParams>() { new PersistentTasksService.WaitForPersistentTaskListener<OpenJobAction.JobParams>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask) {
if (predicate.exception != null) { if (predicate.exception != null) {
@ -550,7 +550,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception, private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
ActionListener<OpenJobAction.Response> listener) { ActionListener<OpenJobAction.Response> listener) {
persistentTasksService.cancelPersistentTask(persistentTask.getId(), persistentTasksService.sendRemoveRequest(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {

View File

@ -137,7 +137,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
Job job = mlMetadata.getJobs().get(datafeed.getJobId()); Job job = mlMetadata.getJobs().get(datafeed.getJobId());
DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap( DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap(
dataExtractorFactory -> dataExtractorFactory ->
persistentTasksService.startPersistentTask(MLMetadataField.datafeedTaskId(params.getDatafeedId()), persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()),
StartDatafeedAction.TASK_NAME, params, finalListener) StartDatafeedAction.TASK_NAME, params, finalListener)
, listener::onFailure)); , listener::onFailure));
} else { } else {
@ -156,8 +156,8 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params, private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params,
ActionListener<StartDatafeedAction.Response> listener) { ActionListener<StartDatafeedAction.Response> listener) {
DatafeedPredicate predicate = new DatafeedPredicate(); DatafeedPredicate predicate = new DatafeedPredicate();
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, params.getTimeout(), persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(),
new PersistentTasksService.WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() { new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
if (predicate.exception != null) { if (predicate.exception != null) {
@ -184,7 +184,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask, private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask,
Exception exception, ActionListener<StartDatafeedAction.Response> listener) { Exception exception, ActionListener<StartDatafeedAction.Response> listener) {
persistentTasksService.cancelPersistentTask(persistentTask.getId(), persistentTasksService.sendRemoveRequest(persistentTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {

View File

@ -190,7 +190,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
for (String datafeedId : startedDatafeeds) { for (String datafeedId : startedDatafeeds) {
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) { if (datafeedTask != null) {
persistentTasksService.cancelPersistentTask(datafeedTask.getId(), persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
@ -275,7 +275,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
void waitForDatafeedStopped(List<String> datafeedPersistentTaskIds, StopDatafeedAction.Request request, void waitForDatafeedStopped(List<String> datafeedPersistentTaskIds, StopDatafeedAction.Request request,
StopDatafeedAction.Response response, StopDatafeedAction.Response response,
ActionListener<StopDatafeedAction.Response> listener) { ActionListener<StopDatafeedAction.Response> listener) {
persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
for (String persistentTaskId: datafeedPersistentTaskIds) { for (String persistentTaskId: datafeedPersistentTaskIds) {
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false; return false;

View File

@ -49,7 +49,7 @@ import java.util.function.Supplier;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener;
public class DatafeedManager extends AbstractComponent { public class DatafeedManager extends AbstractComponent {
@ -391,8 +391,8 @@ public class DatafeedManager extends AbstractComponent {
return; return;
} }
task.waitForPersistentTaskStatus(Objects::isNull, TimeValue.timeValueSeconds(20), task.waitForPersistentTask(Objects::isNull, TimeValue.timeValueSeconds(20),
new WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>() { new WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
@Override @Override
public void onResponse(PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) { public void onResponse(PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(getJobId()); CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(getJobId());

View File

@ -59,14 +59,14 @@ public class TransportDeleteRollupJobAction
TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option
// Step 1. Cancel the persistent task // Step 1. Cancel the persistent task
persistentTasksService.cancelPersistentTask(jobId, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() { persistentTasksService.sendRemoveRequest(jobId, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
logger.debug("Request to cancel Task for Rollup job [" + jobId + "] successful."); logger.debug("Request to cancel Task for Rollup job [" + jobId + "] successful.");
// Step 2. Wait for the task to finish cancellation internally // Step 2. Wait for the task to finish cancellation internally
persistentTasksService.waitForPersistentTaskStatus(jobId, Objects::isNull, timeout, persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout,
new PersistentTasksService.WaitForPersistentTaskStatusListener<RollupJob>() { new PersistentTasksService.WaitForPersistentTaskListener<RollupJob>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<RollupJob> task) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<RollupJob> task) {
logger.debug("Task for Rollup job [" + jobId + "] successfully canceled."); logger.debug("Task for Rollup job [" + jobId + "] successfully canceled.");

View File

@ -205,7 +205,7 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
static void startPersistentTask(RollupJob job, ActionListener<PutRollupJobAction.Response> listener, static void startPersistentTask(RollupJob job, ActionListener<PutRollupJobAction.Response> listener,
PersistentTasksService persistentTasksService) { PersistentTasksService persistentTasksService) {
persistentTasksService.startPersistentTask(job.getConfig().getId(), RollupField.TASK_NAME, job, persistentTasksService.sendStartRequest(job.getConfig().getId(), RollupField.TASK_NAME, job,
ActionListener.wrap( ActionListener.wrap(
rollupConfigPersistentTask -> waitForRollupStarted(job, listener, persistentTasksService), rollupConfigPersistentTask -> waitForRollupStarted(job, listener, persistentTasksService),
e -> { e -> {
@ -220,8 +220,8 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
private static void waitForRollupStarted(RollupJob job, ActionListener<PutRollupJobAction.Response> listener, private static void waitForRollupStarted(RollupJob job, ActionListener<PutRollupJobAction.Response> listener,
PersistentTasksService persistentTasksService) { PersistentTasksService persistentTasksService) {
persistentTasksService.waitForPersistentTaskStatus(job.getConfig().getId(), Objects::nonNull, job.getConfig().getTimeout(), persistentTasksService.waitForPersistentTaskCondition(job.getConfig().getId(), Objects::nonNull, job.getConfig().getTimeout(),
new PersistentTasksService.WaitForPersistentTaskStatusListener<RollupJob>() { new PersistentTasksService.WaitForPersistentTaskListener<RollupJob>() {
@Override @Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<RollupJob> task) { public void onResponse(PersistentTasksCustomMetaData.PersistentTask<RollupJob> task) {
listener.onResponse(new PutRollupJobAction.Response(true)); listener.onResponse(new PutRollupJobAction.Response(true));

View File

@ -300,11 +300,11 @@ public class PutJobStateMachineTests extends ESTestCase {
doAnswer(invocation -> { doAnswer(invocation -> {
requestCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex())); requestCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex()));
return null; return null;
}).when(tasksService).startPersistentTask(eq(job.getConfig().getId()), }).when(tasksService).sendStartRequest(eq(job.getConfig().getId()),
eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture());
TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService);
verify(tasksService).startPersistentTask(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any()); verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -326,18 +326,18 @@ public class PutJobStateMachineTests extends ESTestCase {
mock(PersistentTasksCustomMetaData.Assignment.class)); mock(PersistentTasksCustomMetaData.Assignment.class));
requestCaptor.getValue().onResponse(response); requestCaptor.getValue().onResponse(response);
return null; return null;
}).when(tasksService).startPersistentTask(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); }).when(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture());
ArgumentCaptor<PersistentTasksService.WaitForPersistentTaskStatusListener> requestCaptor2 ArgumentCaptor<PersistentTasksService.WaitForPersistentTaskListener> requestCaptor2
= ArgumentCaptor.forClass(PersistentTasksService.WaitForPersistentTaskStatusListener.class); = ArgumentCaptor.forClass(PersistentTasksService.WaitForPersistentTaskListener.class);
doAnswer(invocation -> { doAnswer(invocation -> {
// Bail here with an error, further testing will happen through tests of #startPersistentTask // Bail here with an error, further testing will happen through tests of #startPersistentTask
requestCaptor2.getValue().onFailure(new RuntimeException("Ending")); requestCaptor2.getValue().onFailure(new RuntimeException("Ending"));
return null; return null;
}).when(tasksService).waitForPersistentTaskStatus(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture()); }).when(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture());
TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService);
verify(tasksService).startPersistentTask(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any()); verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any());
verify(tasksService).waitForPersistentTaskStatus(eq(job.getConfig().getId()), any(), any(), any()); verify(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), any());
} }
} }