Move waitForTaskCompletion into TaskManager

This allows for listening for the waiting to start using
MockTaskManager. This allows us to work around a race condition
in the TasksIT.
This commit is contained in:
Nik Everett 2016-06-16 09:43:14 -04:00
parent 0faa9409b3
commit 5aa4769b25
7 changed files with 71 additions and 23 deletions

View File

@ -38,9 +38,9 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.PersistedTaskInfo;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.PersistedTaskInfo;
import org.elasticsearch.tasks.TaskPersistenceService; import org.elasticsearch.tasks.TaskPersistenceService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.BaseTransportResponseHandler;
@ -51,7 +51,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout; import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForTaskCompletion;
/** /**
* Action to get a single task. If the task isn't running then it'll try to request the status from request index. * Action to get a single task. If the task isn't running then it'll try to request the status from request index.
@ -148,7 +147,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
threadPool.generic().execute(new AbstractRunnable() { threadPool.generic().execute(new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
waitForTaskCompletion(taskManager, runningTask, waitForCompletionTimeout(request.getTimeout())); taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
// TODO look up the task's result from the .tasks index now that it is done // TODO look up the task's result from the .tasks index now that it is done
listener.onResponse( listener.onResponse(
new GetTaskResponse(new PersistedTaskInfo(runningTask.taskInfo(clusterService.localNode(), true)))); new GetTaskResponse(new PersistedTaskInfo(runningTask.taskInfo(clusterService.localNode(), true))));

View File

@ -19,8 +19,6 @@
package org.elasticsearch.action.admin.cluster.node.tasks.list; package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
@ -34,7 +32,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -42,26 +39,12 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/** /**
* *
*/ */
public class TransportListTasksAction extends TransportTasksAction<Task, ListTasksRequest, ListTasksResponse, TaskInfo> { public class TransportListTasksAction extends TransportTasksAction<Task, ListTasksRequest, ListTasksResponse, TaskInfo> {
public static void waitForTaskCompletion(TaskManager taskManager, Task task, long untilInNanos) {
while (System.nanoTime() - untilInNanos < 0) {
if (taskManager.getTask(task.getId()) == null) {
return;
}
try {
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task);
}
}
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
}
public static long waitForCompletionTimeout(TimeValue timeout) { public static long waitForCompletionTimeout(TimeValue timeout) {
if (timeout == null) { if (timeout == null) {
timeout = DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT; timeout = DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT;
@ -69,7 +52,6 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
return System.nanoTime() + timeout.nanos(); return System.nanoTime() + timeout.nanos();
} }
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30); private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);
@Inject @Inject
@ -105,7 +87,7 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
// for itself or one of its child tasks // for itself or one of its child tasks
return; return;
} }
waitForTaskCompletion(taskManager, task, timeoutNanos); taskManager.waitForTaskCompletion(task, timeoutNanos);
}); });
} }
super.processTasks(request, operation); super.processTasks(request, operation);

View File

@ -19,6 +19,8 @@
package org.elasticsearch.tasks; package org.elasticsearch.tasks;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
@ -28,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
@ -43,10 +46,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/** /**
* Task Manager service for keeping track of currently running tasks on the nodes * Task Manager service for keeping track of currently running tasks on the nodes
*/ */
public class TaskManager extends AbstractComponent implements ClusterStateListener { public class TaskManager extends AbstractComponent implements ClusterStateListener {
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private final ConcurrentMapLong<Task> tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final ConcurrentMapLong<Task> tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
@ -341,6 +347,23 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
} }
} }
/**
* Blocks the calling thread, waiting for the task to vanish from the TaskManager.
*/
public void waitForTaskCompletion(Task task, long untilInNanos) {
while (System.nanoTime() - untilInNanos < 0) {
if (getTask(task.getId()) == null) {
return;
}
try {
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task);
}
}
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
}
private static class CancellableTaskHolder { private static class CancellableTaskHolder {
private static final String TASK_FINISHED_MARKER = "task finished"; private static final String TASK_FINISHED_MARKER = "task finished";

View File

@ -60,6 +60,10 @@ public class RecordingTaskManagerListener implements MockTaskManagerListener {
} }
} }
@Override
public void waitForTaskCompletion(Task task) {
}
public synchronized List<Tuple<Boolean, TaskInfo>> getEvents() { public synchronized List<Tuple<Boolean, TaskInfo>> getEvents() {
return Collections.unmodifiableList(new ArrayList<>(events)); return Collections.unmodifiableList(new ArrayList<>(events));
} }

View File

@ -363,6 +363,10 @@ public class TasksIT extends ESIntegTestCase {
taskFinishLock.lock(); taskFinishLock.lock();
taskFinishLock.unlock(); taskFinishLock.unlock();
} }
@Override
public void waitForTaskCompletion(Task task) {
}
}); });
} }
indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute(); indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
@ -470,8 +474,30 @@ public class TasksIT extends ESIntegTestCase {
// Wait for the task to start // Wait for the task to start
assertBusy(() -> client().admin().cluster().prepareGetTask(taskId).get()); assertBusy(() -> client().admin().cluster().prepareGetTask(taskId).get());
// Spin up a request to wait for that task to finish // Register listeners so we can be sure the waiting started
CountDownLatch waitForWaitingToStart = new CountDownLatch(1);
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
@Override
public void waitForTaskCompletion(Task task) {
}
@Override
public void onTaskRegistered(Task task) {
}
@Override
public void onTaskUnregistered(Task task) {
waitForWaitingToStart.countDown();
}
});
}
// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId); waitResponseFuture = wait.apply(taskId);
// Wait for the wait to start
waitForWaitingToStart.await();
} finally { } finally {
// Unblock the request so the wait for completion request can finish // Unblock the request so the wait for completion request can finish
TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get(); TestTaskPlugin.UnblockTestTasksAction.INSTANCE.newRequestBuilder(client()).get();

View File

@ -75,6 +75,18 @@ public class MockTaskManager extends TaskManager {
return removedTask; return removedTask;
} }
@Override
public void waitForTaskCompletion(Task task, long untilInNanos) {
for (MockTaskManagerListener listener : listeners) {
try {
listener.waitForTaskCompletion(task);
} catch (Throwable t) {
logger.warn("failed to notify task manager listener about waitForTaskCompletion the task with id {}", t, task.getId());
}
}
super.waitForTaskCompletion(task, untilInNanos);
}
public void addListener(MockTaskManagerListener listener) { public void addListener(MockTaskManagerListener listener) {
listeners.add(listener); listeners.add(listener);
} }

View File

@ -28,4 +28,6 @@ public interface MockTaskManagerListener {
void onTaskRegistered(Task task); void onTaskRegistered(Task task);
void onTaskUnregistered(Task task); void onTaskUnregistered(Task task);
void waitForTaskCompletion(Task task);
} }