diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 769676bcd6d..0a869ff296a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -38,9 +38,9 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.tasks.PersistedTaskInfo; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.PersistedTaskInfo; import org.elasticsearch.tasks.TaskPersistenceService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; @@ -51,7 +51,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout; -import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForTaskCompletion; /** * Action to get a single task. If the task isn't running then it'll try to request the status from request index. @@ -148,7 +147,7 @@ public class TransportGetTaskAction extends HandledTransportAction { - public static void waitForTaskCompletion(TaskManager taskManager, Task task, long untilInNanos) { - while (System.nanoTime() - untilInNanos < 0) { - if (taskManager.getTask(task.getId()) == null) { - return; - } - try { - Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); - } catch (InterruptedException e) { - throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task); - } - } - throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task); - } public static long waitForCompletionTimeout(TimeValue timeout) { if (timeout == null) { timeout = DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT; @@ -69,7 +52,6 @@ public class TransportListTasksAction extends TransportTasksAction tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); @@ -341,6 +347,23 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen } } + /** + * Blocks the calling thread, waiting for the task to vanish from the TaskManager. + */ + public void waitForTaskCompletion(Task task, long untilInNanos) { + while (System.nanoTime() - untilInNanos < 0) { + if (getTask(task.getId()) == null) { + return; + } + try { + Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); + } catch (InterruptedException e) { + throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task); + } + } + throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task); + } + private static class CancellableTaskHolder { private static final String TASK_FINISHED_MARKER = "task finished"; diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java index 2aad88c99a9..656a5ab9ec4 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java @@ -60,6 +60,10 @@ public class RecordingTaskManagerListener implements MockTaskManagerListener { } } + @Override + public void waitForTaskCompletion(Task task) { + } + public synchronized List> getEvents() { return Collections.unmodifiableList(new ArrayList<>(events)); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index a2938876942..4c720536d0c 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -363,6 +363,10 @@ public class TasksIT extends ESIntegTestCase { taskFinishLock.lock(); taskFinishLock.unlock(); } + + @Override + public void waitForTaskCompletion(Task task) { + } }); } indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute(); @@ -470,8 +474,30 @@ public class TasksIT extends ESIntegTestCase { // Wait for the task to start assertBusy(() -> client().admin().cluster().prepareGetTask(taskId).get()); - // Spin up a request to wait for that task to finish + // Register listeners so we can be sure the waiting started + CountDownLatch waitForWaitingToStart = new CountDownLatch(1); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + ((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() { + @Override + public void waitForTaskCompletion(Task task) { + } + + @Override + public void onTaskRegistered(Task task) { + } + + @Override + public void onTaskUnregistered(Task task) { + waitForWaitingToStart.countDown(); + } + }); + } + + // Spin up a request to wait for the test task to finish waitResponseFuture = wait.apply(taskId); + + // Wait for the wait to start + waitForWaitingToStart.await(); } finally { // Unblock the request so the wait for completion request can finish TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index fc090e151a3..b0d16d10c49 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -75,6 +75,18 @@ public class MockTaskManager extends TaskManager { return removedTask; } + @Override + public void waitForTaskCompletion(Task task, long untilInNanos) { + for (MockTaskManagerListener listener : listeners) { + try { + listener.waitForTaskCompletion(task); + } catch (Throwable t) { + logger.warn("failed to notify task manager listener about waitForTaskCompletion the task with id {}", t, task.getId()); + } + } + super.waitForTaskCompletion(task, untilInNanos); + } + public void addListener(MockTaskManagerListener listener) { listeners.add(listener); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java index d10dd357999..ede880c8f54 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java @@ -28,4 +28,6 @@ public interface MockTaskManagerListener { void onTaskRegistered(Task task); void onTaskUnregistered(Task task); + + void waitForTaskCompletion(Task task); }