Persistent Tasks: Add waitForPersistentTaskStatus method (elastic/x-pack-elasticsearch#901)

This method allows to wait for tasks to change their status to match the supplied predicate.

Original commit: elastic/x-pack-elasticsearch@9f5d4104a0
This commit is contained in:
Igor Motov 2017-03-31 16:05:34 -04:00 committed by GitHub
parent 232190df97
commit d7e4390490
6 changed files with 75 additions and 29 deletions

View File

@ -303,7 +303,7 @@ public class MachineLearning implements ActionPlugin {
InvalidLicenseEnforcer invalidLicenseEnforcer = InvalidLicenseEnforcer invalidLicenseEnforcer =
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedJobRunner, autodetectProcessManager); new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedJobRunner, autodetectProcessManager);
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, internalClient); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList( PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList(
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService, new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService,
autodetectProcessManager, auditor), autodetectProcessManager, auditor),

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -169,6 +170,15 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return PERSISTENT_TASKS_PARSER.parse(parser, null).build(); return PERSISTENT_TASKS_PARSER.parse(parser, null).build();
} }
@SuppressWarnings("unchecked")
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, long taskId) {
PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) {
return (PersistentTask<Request>)tasks.getTask(taskId);
}
return null;
}
public static class Assignment { public static class Assignment {
@Nullable @Nullable
private final String executorNode; private final String executorNode;
@ -215,8 +225,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
public static final Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment"); public static final Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment");
public static final Assignment FINISHED_TASK_ASSIGNMENT = new Assignment(null, "task has finished");
/** /**
* A record that represents a single running persistent task * A record that represents a single running persistent task
*/ */

View File

@ -8,12 +8,21 @@ package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.function.Predicate;
/** /**
* This service is used by persistent actions to propagate changes in the action state and notify about completion * This service is used by persistent actions to propagate changes in the action state and notify about completion
@ -22,11 +31,13 @@ public class PersistentTasksService extends AbstractComponent {
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final ThreadPool threadPool;
public PersistentTasksService(Settings settings, ClusterService clusterService, Client client) { public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
super(settings); super(settings);
this.client = client; this.client = client;
this.clusterService = clusterService; this.clusterService = clusterService;
this.threadPool = threadPool;
} }
/** /**
@ -102,15 +113,33 @@ public class PersistentTasksService extends AbstractComponent {
} }
/** /**
* Starts a persistent task * Waits for the persistent task with giving id (taskId) to achieve the desired status.
*/ */
public void startTask(long taskId, PersistentTaskOperationListener listener) { public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(taskId); WaitForPersistentTaskStatusListener listener) {
try { ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
client.execute(StartPersistentTaskAction.INSTANCE, startRequest, ActionListener.wrap(o -> listener.onResponse(taskId), stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
listener::onFailure)); @Override
} catch (Exception e) { public void onNewClusterState(ClusterState state) {
listener.onFailure(e); listener.onResponse(taskId);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onTimeout(timeout);
}
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
}
public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener {
default void onTimeout(TimeValue timeout) {
onFailure(new IllegalStateException("timed out after " + timeout));
} }
} }

View File

@ -6,12 +6,13 @@
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
@ -20,12 +21,10 @@ import org.junit.After;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
@ -50,7 +49,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertNoRunningTasks(); assertNoRunningTasks();
} }
public static class PersistentTaskOperationFuture extends BaseFuture<Long> implements PersistentTaskOperationListener { public static class PersistentTaskOperationFuture extends BaseFuture<Long> implements WaitForPersistentTaskStatusListener {
@Override @Override
public void onResponse(long taskId) { public void onResponse(long taskId) {
@ -152,7 +151,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); PersistentTaskOperationFuture future = new PersistentTaskOperationFuture();
persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
future.get(); long taskId = future.get();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
@ -175,20 +174,30 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
int finalI = i; int finalI = i;
assertBusy(() -> { PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData() persistentTasksService.waitForPersistentTaskStatus(taskId,
.custom(PersistentTasksCustomMetaData.TYPE); task -> task != null && task.isCurrentStatus()&& task.getStatus().toString() != null &&
assertThat(tasks.tasks().size(), equalTo(1)); task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue()); TimeValue.timeValueSeconds(10), future1);
assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}")); assertThat(future1.get(), equalTo(taskId));
});
} }
PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture();
persistentTasksService.waitForPersistentTaskStatus(taskId,
task -> false, TimeValue.timeValueMillis(10), future1);
expectThrows(Exception.class, future1::get);
// Wait for the task to disappear
PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture();
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
logger.info("Completing the running task"); logger.info("Completing the running task");
// Complete the running task and make sure it finishes properly // Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1)); .get().getTasks().size(), equalTo(1));
assertThat(future2.get(), equalTo(taskId));
} }

View File

@ -150,7 +150,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
ClusterService clusterService = createClusterService(); ClusterService clusterService = createClusterService();
AtomicLong capturedTaskId = new AtomicLong(); AtomicLong capturedTaskId = new AtomicLong();
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>(); AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null) { PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
@Override @Override
public void sendCancellation(long taskId, PersistentTaskOperationListener listener) { public void sendCancellation(long taskId, PersistentTaskOperationListener listener) {
capturedTaskId.set(taskId); capturedTaskId.set(taskId);
@ -228,7 +228,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
AtomicReference<Exception> capturedException = new AtomicReference<>(); AtomicReference<Exception> capturedException = new AtomicReference<>();
AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>(); AtomicReference<PersistentTaskOperationListener> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = PersistentTasksService persistentTasksService =
new PersistentTasksService(Settings.EMPTY, clusterService, null) { new PersistentTasksService(Settings.EMPTY, clusterService, null, null) {
@Override @Override
public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) { public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) {
capturedTaskId.set(taskId); capturedTaskId.set(taskId);

View File

@ -89,7 +89,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService, ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) { NamedXContentRegistry xContentRegistry) {
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, client); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService, TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService,
clusterService); clusterService);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,