Persistent Tasks: Add waitForPersistentTaskStatus method (#901)
This method allows to wait for tasks to change their status to match the supplied predicate.
This commit is contained in:
parent
78b844e79b
commit
6ca044736e
|
@ -20,6 +20,7 @@ package org.elasticsearch.persistent;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.AbstractNamedDiffable;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -182,6 +183,15 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
return PERSISTENT_TASKS_PARSER.parse(parser, null).build();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, long taskId) {
|
||||
PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
if (tasks != null) {
|
||||
return (PersistentTask<Request>)tasks.getTask(taskId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static class Assignment {
|
||||
@Nullable
|
||||
private final String executorNode;
|
||||
|
@ -228,8 +238,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
|
||||
public static final Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment");
|
||||
|
||||
public static final Assignment FINISHED_TASK_ASSIGNMENT = new Assignment(null, "task has finished");
|
||||
|
||||
/**
|
||||
* A record that represents a single running persistent task
|
||||
*/
|
||||
|
|
|
@ -21,12 +21,21 @@ package org.elasticsearch.persistent;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* This service is used by persistent actions to propagate changes in the action state and notify about completion
|
||||
|
@ -35,11 +44,13 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
|
||||
private final Client client;
|
||||
private final ClusterService clusterService;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
public PersistentTasksService(Settings settings, ClusterService clusterService, Client client) {
|
||||
public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
|
||||
super(settings);
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,15 +126,33 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Starts a persistent task
|
||||
* Waits for the persistent task with giving id (taskId) to achieve the desired status.
|
||||
*/
|
||||
public void startTask(long taskId, PersistentTaskOperationListener listener) {
|
||||
StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(taskId);
|
||||
try {
|
||||
client.execute(StartPersistentTaskAction.INSTANCE, startRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
|
||||
listener::onFailure));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
|
||||
WaitForPersistentTaskStatusListener listener) {
|
||||
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
|
||||
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
listener.onResponse(taskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onTimeout(timeout);
|
||||
}
|
||||
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
|
||||
}
|
||||
|
||||
public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener {
|
||||
default void onTimeout(TimeValue timeout) {
|
||||
onFailure(new IllegalStateException("timed out after " + timeout));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,12 +20,13 @@
|
|||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.BaseFuture;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.persistent.PersistentTasksService.PersistentTaskOperationListener;
|
||||
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
|
||||
|
@ -34,12 +35,10 @@ import org.junit.After;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
|
||||
|
@ -64,7 +63,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
assertNoRunningTasks();
|
||||
}
|
||||
|
||||
public static class PersistentTaskOperationFuture extends BaseFuture<Long> implements PersistentTaskOperationListener {
|
||||
public static class PersistentTaskOperationFuture extends BaseFuture<Long> implements WaitForPersistentTaskStatusListener {
|
||||
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
|
@ -166,7 +165,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
|
||||
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
|
||||
future.get();
|
||||
long taskId = future.get();
|
||||
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
|
@ -189,20 +188,30 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
.get().getTasks().size(), equalTo(1));
|
||||
|
||||
int finalI = i;
|
||||
assertBusy(() -> {
|
||||
PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(1));
|
||||
assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue());
|
||||
assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
|
||||
});
|
||||
|
||||
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId,
|
||||
task -> task != null && task.isCurrentStatus()&& task.getStatus().toString() != null &&
|
||||
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
|
||||
TimeValue.timeValueSeconds(10), future1);
|
||||
assertThat(future1.get(), equalTo(taskId));
|
||||
}
|
||||
|
||||
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId,
|
||||
task -> false, TimeValue.timeValueMillis(10), future1);
|
||||
|
||||
expectThrows(Exception.class, future1::get);
|
||||
|
||||
// Wait for the task to disappear
|
||||
PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture();
|
||||
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
|
||||
|
||||
logger.info("Completing the running task");
|
||||
// Complete the running task and make sure it finishes properly
|
||||
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
|
||||
.get().getTasks().size(), equalTo(1));
|
||||
|
||||
assertThat(future2.get(), equalTo(taskId));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -164,7 +164,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
ClusterService clusterService = createClusterService();
|
||||
AtomicLong capturedTaskId = new AtomicLong();
|
||||
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null) {
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
|
||||
@Override
|
||||
public void sendCancellation(long taskId, PersistentTaskOperationListener listener) {
|
||||
capturedTaskId.set(taskId);
|
||||
|
@ -242,7 +242,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
AtomicReference<Exception> capturedException = new AtomicReference<>();
|
||||
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
|
||||
PersistentTasksService persistentTasksService =
|
||||
new PersistentTasksService(Settings.EMPTY, clusterService, null) {
|
||||
new PersistentTasksService(Settings.EMPTY, clusterService, null, null) {
|
||||
@Override
|
||||
public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) {
|
||||
capturedTaskId.set(taskId);
|
||||
|
|
|
@ -103,7 +103,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
|
||||
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
|
||||
NamedXContentRegistry xContentRegistry) {
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, client);
|
||||
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
|
||||
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService,
|
||||
clusterService);
|
||||
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
|
||||
|
|
Loading…
Reference in New Issue