Rework the basic IT for GETing running tasks

This integ test relied on the false assumption that
`MockTaskManagerListener#onTaskUnregistered` was called *before* the
task was unregistered. It is in fact called after the task is unregistered.
This mistake led the test to *rarely* miss the task it was looking
for and fail.

Found by https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+5.x+multijob-unix-compatibility/os=ubuntu/4/consoleText
This commit is contained in:
Nik Everett 2016-09-12 17:29:14 -04:00
parent c84bc25500
commit afbd7cbeb8
2 changed files with 24 additions and 34 deletions

View File

@ -216,7 +216,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
public void onFailure(Exception e) { public void onFailure(Exception e) {
if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) { if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) {
// We haven't yet created the index for the task results so it can't be found. // We haven't yet created the index for the task results so it can't be found.
listener.onFailure(new ResourceNotFoundException("task [{}] isn't running or stored its results", e, listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e,
request.getTaskId())); request.getTaskId()));
} else { } else {
listener.onFailure(e); listener.onFailure(e);

View File

@ -37,7 +37,10 @@ import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.fieldstats.FieldStatsAction; import org.elasticsearch.action.fieldstats.FieldStatsAction;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationActionTests;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -46,10 +49,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.tasks.MockTaskManager; import org.elasticsearch.test.tasks.MockTaskManager;
@ -71,7 +74,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -326,48 +328,35 @@ public class TasksIT extends ESIntegTestCase {
} }
/** /**
* Very basic "is it plugged in" style test that indexes a document and * Very basic "is it plugged in" style test that indexes a document and makes sure that you can fetch the status of the process. The
* makes sure that you can fetch the status of the process. The goal here is * goal here is to verify that the large moving parts that make fetching task status work fit together rather than to verify any
* to verify that the large moving parts that make fetching task status work * particular status results from indexing. For that, look at {@link TransportReplicationActionTests}. We intentionally don't use the
* fit together rather than to verify any particular status results from * task recording mechanism used in other places in this test so we can make sure that the status fetching works properly over the wire.
* indexing. For that, look at
* {@link org.elasticsearch.action.support.replication.TransportReplicationActionTests}
* . We intentionally don't use the task recording mechanism used in other
* places in this test so we can make sure that the status fetching works
* properly over the wire.
*/ */
public void testCanFetchIndexStatus() throws InterruptedException, ExecutionException, IOException { public void testCanFetchIndexStatus() throws InterruptedException, ExecutionException, IOException {
/* /* We make sure all indexing tasks wait to start before this lock is *unlocked* so we can fetch their status with both the get and
* We prevent any tasks from unregistering until the test is done so we * list APIs. */
* can fetch them. This will gum up the server if we leave it enabled CountDownLatch taskRegistered = new CountDownLatch(1);
* but we'll be quick so it'll be OK (TM). CountDownLatch letTaskFinish = new CountDownLatch(1);
*/ ListenableActionFuture<IndexResponse> indexFuture = null;
ReentrantLock taskFinishLock = new ReentrantLock();
taskFinishLock.lock();
ListenableActionFuture<?> indexFuture = null;
try { try {
CountDownLatch taskRegistered = new CountDownLatch(1);
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() { ((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
@Override @Override
public void onTaskRegistered(Task task) { public void onTaskRegistered(Task task) {
if (task.getAction().startsWith(IndexAction.NAME)) { if (task.getAction().startsWith(IndexAction.NAME)) {
taskRegistered.countDown(); taskRegistered.countDown();
logger.debug("Blocking [{}] starting", task);
try {
letTaskFinish.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
} }
@Override @Override
public void onTaskUnregistered(Task task) { public void onTaskUnregistered(Task task) {
/*
* We can't block all tasks here or the task listing task
* would never return.
*/
if (false == task.getAction().startsWith(IndexAction.NAME)) {
return;
}
logger.debug("Blocking {} from being unregistered", task);
taskFinishLock.lock();
taskFinishLock.unlock();
} }
@Override @Override
@ -390,16 +379,17 @@ public class TasksIT extends ESIntegTestCase {
assertEquals(task.getType(), fetchedWithGet.getType()); assertEquals(task.getType(), fetchedWithGet.getType());
assertEquals(task.getAction(), fetchedWithGet.getAction()); assertEquals(task.getAction(), fetchedWithGet.getAction());
assertEquals(task.getDescription(), fetchedWithGet.getDescription()); assertEquals(task.getDescription(), fetchedWithGet.getDescription());
// The status won't always be equal - it might change between the list and the get. assertEquals(task.getStatus(), fetchedWithGet.getStatus());
assertEquals(task.getStartTime(), fetchedWithGet.getStartTime()); assertEquals(task.getStartTime(), fetchedWithGet.getStartTime());
assertThat(fetchedWithGet.getRunningTimeNanos(), greaterThanOrEqualTo(task.getRunningTimeNanos())); assertThat(fetchedWithGet.getRunningTimeNanos(), greaterThanOrEqualTo(task.getRunningTimeNanos()));
assertEquals(task.isCancellable(), fetchedWithGet.isCancellable()); assertEquals(task.isCancellable(), fetchedWithGet.isCancellable());
assertEquals(task.getParentTaskId(), fetchedWithGet.getParentTaskId()); assertEquals(task.getParentTaskId(), fetchedWithGet.getParentTaskId());
} }
} finally { } finally {
taskFinishLock.unlock(); letTaskFinish.countDown();
if (indexFuture != null) { if (indexFuture != null) {
indexFuture.get(); IndexResponse indexResponse = indexFuture.get();
assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures());
} }
} }
} }