diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java index 6f77215834a..66cd9e73288 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java @@ -23,9 +23,11 @@ package io.druid.indexing.overlord; import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.ISE; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestTasks; import io.druid.indexing.common.task.Task; +import org.apache.zookeeper.ZooKeeper; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -81,9 +83,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest results[i] = (remoteTaskRunner.run(tasks[i])); } - while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) { - Thread.sleep(5); - } + waitForBothWorkersToHaveUnackedTasks(); //3 more tasks, all of which get queued up for (int i = 2; i < 5; i++) { @@ -92,52 +92,40 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest } //simulate completion of task0 and task1 - if (rtrTestUtils.taskAnnounced("worker0", tasks[0].getId())) { - rtrTestUtils.mockWorkerRunningTask("worker0", tasks[0]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[0]); - rtrTestUtils.mockWorkerRunningTask("worker1", tasks[1]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[1]); - } else { - rtrTestUtils.mockWorkerRunningTask("worker0", tasks[1]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[1]); - rtrTestUtils.mockWorkerRunningTask("worker1", tasks[0]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[0]); - } + mockWorkerRunningAndCompletionSuccessfulTasks(tasks[0], tasks[1]); Assert.assertEquals(TaskStatus.Status.SUCCESS, results[0].get().getStatusCode()); Assert.assertEquals(TaskStatus.Status.SUCCESS, results[1].get().getStatusCode()); // now both threads race to run the last 3 tasks. task2 and task3 are being assigned - while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) { - Thread.sleep(5); - } + waitForBothWorkersToHaveUnackedTasks(); - //cancel task4, both executor threads should be able to ignore task4 - remoteTaskRunner.shutdown("task4"); - - //simulate completion of task3 before task2 so that the executor thread with task2 - //gets to task3 and ignores it - if (rtrTestUtils.taskAnnounced("worker0", tasks[3].getId())) { - rtrTestUtils.mockWorkerRunningTask("worker0", tasks[3]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[3]); - rtrTestUtils.mockWorkerRunningTask("worker1", tasks[2]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[2]); + if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId()) + && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId())) { + remoteTaskRunner.shutdown("task4"); + mockWorkerRunningAndCompletionSuccessfulTasks(tasks[3], tasks[2]); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode()); + } else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[3].getId()) + && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId())) { + remoteTaskRunner.shutdown("task2"); + mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[3]); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[4].get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode()); + } else if (remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[4].getId()) + && remoteTaskRunner.getWorkersWithUnacknowledgedTask().containsValue(tasks[2].getId())) { + remoteTaskRunner.shutdown("task3"); + mockWorkerRunningAndCompletionSuccessfulTasks(tasks[4], tasks[2]); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[4].get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode()); } else { - rtrTestUtils.mockWorkerRunningTask("worker1", tasks[3]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", tasks[3]); - rtrTestUtils.mockWorkerRunningTask("worker0", tasks[2]); - rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[2]); + throw new ISE("two out of three tasks 2,3 and 4 must be waiting for ack."); } - Assert.assertEquals(TaskStatus.Status.SUCCESS, results[2].get().getStatusCode()); - Assert.assertEquals(TaskStatus.Status.SUCCESS, results[3].get().getStatusCode()); - //ensure that RTR is doing OK and still making progress tasks[5] = TestTasks.unending("task5"); results[5] = remoteTaskRunner.run(tasks[5]); - while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) { - Thread.sleep(5); - } + waitForOneWorkerToHaveUnackedTasks(); if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) { rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]); rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]); @@ -147,4 +135,45 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest } Assert.assertEquals(TaskStatus.Status.SUCCESS, results[5].get().getStatusCode()); } + + private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2) throws Exception + { + if (rtrTestUtils.taskAnnounced("worker0", t1.getId())) { + rtrTestUtils.mockWorkerRunningTask("worker0", t1); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1); + rtrTestUtils.mockWorkerRunningTask("worker1", t2); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", t2); + } else { + rtrTestUtils.mockWorkerRunningTask("worker1", t1); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker1", t1); + rtrTestUtils.mockWorkerRunningTask("worker0", t2); + rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t2); + } + } + + private void waitForOneWorkerToHaveUnackedTasks() throws Exception + { + while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 1) { + Thread.sleep(5); + } + + ZooKeeper zk = rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper(); + while (zk.getChildren(rtrTestUtils.tasksPath + "/worker0", false).size() < 1 + && zk.getChildren(rtrTestUtils.tasksPath + "/worker1", false).size() < 1) { + Thread.sleep(5); + } + } + + private void waitForBothWorkersToHaveUnackedTasks() throws Exception + { + while (remoteTaskRunner.getWorkersWithUnacknowledgedTask().size() < 2) { + Thread.sleep(5); + } + + ZooKeeper zk = rtrTestUtils.getCuratorFramework().getZookeeperClient().getZooKeeper(); + while (zk.getChildren(rtrTestUtils.tasksPath + "/worker0", false).size() < 1 + || zk.getChildren(rtrTestUtils.tasksPath + "/worker1", false).size() < 1) { + Thread.sleep(5); + } + } }