fix spurious failure of RTR concurrency test (#2915)

This commit is contained in:
Himanshu 2016-05-04 12:30:20 -05:00 committed by Fangjin Yang
parent 44e52acfc0
commit 50065c8288
1 changed files with 66 additions and 37 deletions

View File

@ -23,9 +23,11 @@
package io.druid.indexing.overlord;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.ISE;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestTasks;
import io.druid.indexing.common.task.Task;
import org.apache.zookeeper.ZooKeeper;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
@ -81,9 +83,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
results[i] = (remoteTaskRunner.run(tasks[i]));
}
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) {
Thread.sleep(5);
}
waitForBothWorkersToHaveUnackedTasks();
//3 more tasks, all of which get queued up
for (int i = 2; i < 5; i++) {
@ -92,52 +92,40 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
}
//simulate completion of task0 and task1
if (rtrTestUtils.taskAnnounced("worker0", tasks[0].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[0]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[0]);
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[1]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[1]);
} else {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[1]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[1]);
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[0]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[0]);
}
mockWorkerRunningAndCompletionSuccessfulTasks(tasks[0], tasks[1]);
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[0].get().getStatusCode());
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[1].get().getStatusCode());
// now both threads race to run the last 3 tasks. task2 and task3 are being assigned
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) {
Thread.sleep(5);
}
waitForBothWorkersToHaveUnackedTasks();
//cancel task4, both executor threads should be able to ignore task4
remoteTaskRunner.shutdown("task4");
//simulate completion of task3 before task2 so that the executor thread with task2
//gets to task3 and ignores it
if (rtrTestUtils.taskAnnounced("worker0", tasks[3].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[3]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[3]);
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[2]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[2]);
if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())
&& remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())) {
remoteTaskRunner.shutdown("task4");
mockWorkerRunningAndCompletionSuccessfulTasks(tasks[3], tasks[2]);
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode());
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode());
} else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())
&& remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())) {
remoteTaskRunner.shutdown("task2");
mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[3]);
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[4].get().getStatusCode());
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode());
} else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())
&& remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())) {
remoteTaskRunner.shutdown("task3");
mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[2]);
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[4].get().getStatusCode());
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode());
} else {
rtrTestUtils.mockWorkerRunningTask("worker1", tasks[3]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[3]);
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[2]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[2]);
throw new ISE("two out of three tasks 2,3 and 4 must be waiting for ack.");
}
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode());
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode());
//ensure that RTR is doing OK and still making progress
tasks[5] = TestTasks.unending("task5");
results[5] = remoteTaskRunner.run(tasks[5]);
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) {
Thread.sleep(5);
}
waitForOneWorkerToHaveUnackedTasks();
if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]);
@ -147,4 +135,45 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
}
Assert.assertEquals(TaskStatus.Status.SUCCESS, results[5].get().getStatusCode());
}
private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2) throws Exception
{
if (rtrTestUtils.taskAnnounced("worker0", t1.getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", t1);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1);
rtrTestUtils.mockWorkerRunningTask("worker1", t2);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", t2);
} else {
rtrTestUtils.mockWorkerRunningTask("worker1", t1);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", t1);
rtrTestUtils.mockWorkerRunningTask("worker0", t2);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t2);
}
}
private void waitForOneWorkerToHaveUnackedTasks() throws Exception
{
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) {
Thread.sleep(5);
}
ZooKeeper zk = rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper();
while (zk.getChildren(rtrTestUtils.tasksPath + "/worker0", false).size() < 1
&& zk.getChildren(rtrTestUtils.tasksPath + "/worker1", false).size() < 1) {
Thread.sleep(5);
}
}
private void waitForBothWorkersToHaveUnackedTasks() throws Exception
{
while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) {
Thread.sleep(5);
}
ZooKeeper zk = rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper();
while (zk.getChildren(rtrTestUtils.tasksPath + "/worker0", false).size() < 1
|| zk.getChildren(rtrTestUtils.tasksPath + "/worker1", false).size() < 1) {
Thread.sleep(5);
}
}
}