Fix up tasks integ test

I'd made some mistakes that hadn't caused the test to fail but did
slow it down and partially invalidate some of the assertions. This
fixes those mistakes.
This commit is contained in:
Nik Everett 2016-09-16 10:26:54 -04:00
parent 8ec94a4ba0
commit 697adfb3c4
1 changed files with 33 additions and 16 deletions

View File

@ -72,11 +72,11 @@ import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@ -333,12 +333,11 @@ public class TasksIT extends ESIntegTestCase {
* particular status results from indexing. For that, look at {@link TransportReplicationActionTests}. We intentionally don't use the
* task recording mechanism used in other places in this test so we can make sure that the status fetching works properly over the wire.
*/
public void testCanFetchIndexStatus() throws InterruptedException, ExecutionException, IOException {
/* We make sure all indexing tasks wait to start before this lock is *unlocked* so we can fetch their status with both the get and
* list APIs. */
public void testCanFetchIndexStatus() throws Exception {
// First latch waits for the task to start, second on blocks it from finishing.
CountDownLatch taskRegistered = new CountDownLatch(1);
CountDownLatch letTaskFinish = new CountDownLatch(1);
ListenableActionFuture<IndexResponse> indexFuture = null;
Thread index = null;
try {
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
@ -348,7 +347,7 @@ public class TasksIT extends ESIntegTestCase {
taskRegistered.countDown();
logger.debug("Blocking [{}] starting", task);
try {
letTaskFinish.await(10, TimeUnit.SECONDS);
assertTrue(letTaskFinish.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -364,8 +363,13 @@ public class TasksIT extends ESIntegTestCase {
}
});
}
indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
taskRegistered.await(10, TimeUnit.SECONDS); // waiting for at least one task to be registered
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
index = new Thread(() -> {
IndexResponse indexResponse = client().prepareIndex("test", "test").setSource("test", "test").get();
assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures());
});
index.start();
assertTrue(taskRegistered.await(10, TimeUnit.SECONDS)); // waiting for at least one task to be registered
ListTasksResponse listResponse = client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*")
.setDetailed(true).get();
@ -387,10 +391,13 @@ public class TasksIT extends ESIntegTestCase {
}
} finally {
letTaskFinish.countDown();
if (indexFuture != null) {
IndexResponse indexResponse = indexFuture.get();
assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures());
if (index != null) {
index.join();
}
assertBusy(() -> {
assertEquals(emptyList(),
client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*").get().getTasks());
});
}
}
@ -439,6 +446,9 @@ public class TasksIT extends ESIntegTestCase {
}, response -> {
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getTasks(), hasSize(1));
TaskInfo task = response.getTasks().get(0);
assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction());
});
}
@ -446,10 +456,12 @@ public class TasksIT extends ESIntegTestCase {
waitForCompletionTestCase(false, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
assertNotNull(response.getTask().getTask());
assertTrue(response.getTask().isCompleted());
// We didn't store the result so it won't come back when we wait
assertNull(response.getTask().getResponse());
// But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete.
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
});
}
@ -457,10 +469,12 @@ public class TasksIT extends ESIntegTestCase {
waitForCompletionTestCase(true, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
assertNotNull(response.getTask().getTask());
assertTrue(response.getTask().isCompleted());
// We stored the task so we should get its results
assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
// The task's details should also be there
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
});
}
@ -490,6 +504,7 @@ public class TasksIT extends ESIntegTestCase {
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
@Override
public void waitForTaskCompletion(Task task) {
waitForWaitingToStart.countDown();
}
@Override
@ -498,7 +513,6 @@ public class TasksIT extends ESIntegTestCase {
@Override
public void onTaskUnregistered(Task task) {
waitForWaitingToStart.countDown();
}
});
}
@ -506,7 +520,9 @@ public class TasksIT extends ESIntegTestCase {
// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId);
// Wait for the wait to start
/* Wait for the wait to start. This should count down just *before* we wait for completion but after the list/get has got a
* reference to the running task. Because we unblock immediately after this the task may no longer be running for us to wait
* on which is fine. */
waitForWaitingToStart.await();
} finally {
// Unblock the request so the wait for completion request can finish
@ -517,7 +533,8 @@ public class TasksIT extends ESIntegTestCase {
T waitResponse = waitResponseFuture.get();
validator.accept(waitResponse);
future.get();
TestTaskPlugin.NodesResponse response = future.get();
assertEquals(emptyList(), response.failures());
}
public void testListTasksWaitForTimeout() throws Exception {