From 6ca044736e82db5ff30a2f0e86788a4f814562ed Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 31 Mar 2017 16:05:34 -0400 Subject: [PATCH] Persistent Tasks: Add waitForPersistentTaskStatus method (#901) This method allows to wait for tasks to change their status to match the supplied predicate. --- .../PersistentTasksCustomMetaData.java | 12 ++++- .../persistent/PersistentTasksService.java | 47 +++++++++++++++---- .../persistent/PersistentTasksExecutorIT.java | 37 +++++++++------ .../PersistentTasksNodeServiceTests.java | 4 +- .../persistent/TestPersistentTasksPlugin.java | 2 +- 5 files changed, 74 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index a0f6fa3d9f7..a101da93e5a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -20,6 +20,7 @@ package org.elasticsearch.persistent; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -182,6 +183,15 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable PersistentTask getTaskWithId(ClusterState clusterState, long taskId) { + PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasks != null) { + return (PersistentTask)tasks.getTask(taskId); + } + return null; + } + public static class Assignment { @Nullable private final String executorNode; @@ -228,8 +238,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable listener.onResponse(taskId), - listener::onFailure)); - } catch (Exception e) { - listener.onFailure(e); + public void waitForPersistentTaskStatus(long taskId, Predicate> predicate, @Nullable TimeValue timeout, + WaitForPersistentTaskStatusListener listener) { + ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); + stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + listener.onResponse(taskId); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onTimeout(timeout); + } + }, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId))); + } + + public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener { + default void onTimeout(TimeValue timeout) { + onFailure(new IllegalStateException("timed out after " + timeout)); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 6bc3d297be4..3b9eeb68ed5 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -20,12 +20,13 @@ package org.elasticsearch.persistent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.persistent.PersistentTasksService.PersistentTaskOperationListener; +import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestRequest; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; @@ -34,12 +35,10 @@ import org.junit.After; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2) @@ -64,7 +63,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertNoRunningTasks(); } - public static class PersistentTaskOperationFuture extends BaseFuture implements PersistentTaskOperationListener { + public static class PersistentTaskOperationFuture extends BaseFuture implements WaitForPersistentTaskStatusListener { @Override public void onResponse(long taskId) { @@ -166,7 +165,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - future.get(); + long taskId = future.get(); assertBusy(() -> { // Wait for the task to start @@ -189,20 +188,30 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { .get().getTasks().size(), equalTo(1)); int finalI = i; - assertBusy(() -> { - PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(1)); - assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue()); - assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}")); - }); - + PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture(); + persistentTasksService.waitForPersistentTaskStatus(taskId, + task -> task != null && task.isCurrentStatus()&& task.getStatus().toString() != null && + task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), + TimeValue.timeValueSeconds(10), future1); + assertThat(future1.get(), equalTo(taskId)); } + PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture(); + persistentTasksService.waitForPersistentTaskStatus(taskId, + task -> false, TimeValue.timeValueMillis(10), future1); + + expectThrows(Exception.class, future1::get); + + // Wait for the task to disappear + PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture(); + persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2); + logger.info("Completing the running task"); // Complete the running task and make sure it finishes properly assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) .get().getTasks().size(), equalTo(1)); + + assertThat(future2.get(), equalTo(taskId)); } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 5f74f6963c7..576b7c61722 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -164,7 +164,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { ClusterService clusterService = createClusterService(); AtomicLong capturedTaskId = new AtomicLong(); AtomicReference capturedListener = new AtomicReference<>(); - PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null) { + PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { @Override public void sendCancellation(long taskId, PersistentTaskOperationListener listener) { capturedTaskId.set(taskId); @@ -242,7 +242,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { AtomicReference capturedException = new AtomicReference<>(); AtomicReference capturedListener = new AtomicReference<>(); PersistentTasksService persistentTasksService = - new PersistentTasksService(Settings.EMPTY, clusterService, null) { + new PersistentTasksService(Settings.EMPTY, clusterService, null, null) { @Override public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) { capturedTaskId.set(taskId); diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 57f3c7a502e..7bbba5c916c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -103,7 +103,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry) { - PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, client); + PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client); TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService, clusterService); PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,