diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 87d8bc89728..5eed155fd66 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -207,6 +207,11 @@ public class TaskQueue } } + public boolean isActive() + { + return active; + } + /** * Main task runner management loop. Meant to run forever, or, at least until we're stopped. */ diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index cf46ccf5e92..8b0f02a8479 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -430,7 +430,7 @@ public class TaskLifecycleTest }; } }; - setUpAndStartTaskQueue( + setUpTaskQueue( new DataSegmentPusher() { @Override @@ -449,7 +449,7 @@ public class TaskLifecycleTest ); } - private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) + private void setUpTaskQueue(DataSegmentPusher dataSegmentPusher) throws Exception { final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); tsqa = new TaskStorageQueryAdapter(ts); @@ -536,13 +536,14 @@ public class TaskLifecycleTest ); tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); - tq.start(); } @After public void tearDown() { - tq.stop(); + if (tq.isActive()) { + tq.stop(); + } } @Test @@ -864,7 +865,7 @@ public class TaskLifecycleTest Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } - @Test(timeout = 10000L) + @Test(timeout = 60_000L) public void testRealtimeIndexTask() throws Exception { monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); @@ -876,6 +877,7 @@ public class TaskLifecycleTest RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); final String taskId = realtimeIndexTask.getId(); + tq.start(); tq.add(realtimeIndexTask); //wait for task to process events and publish segment publishCountDown.await(); @@ -910,7 +912,7 @@ public class TaskLifecycleTest @Test(timeout = 60_000L) public void testRealtimeIndexTaskFailure() throws Exception { - setUpAndStartTaskQueue( + setUpTaskQueue( new DataSegmentPusher() { @Override @@ -934,6 +936,8 @@ public class TaskLifecycleTest RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); final String taskId = realtimeIndexTask.getId(); + + tq.start(); tq.add(realtimeIndexTask); // Wait for realtime index task to fail @@ -974,6 +978,7 @@ public class TaskLifecycleTest final long startTime = System.currentTimeMillis(); // manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage + tq.start(); ts.insert(indexTask, TaskStatus.running(indexTask.getId())); while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) { @@ -1022,6 +1027,7 @@ public class TaskLifecycleTest Preconditions.checkArgument(!task.getId().equals(dummyTask.getId())); + tq.start(); tq.add(dummyTask); tq.add(task);