This commit is contained in:
fjy 2016-01-01 16:44:10 -08:00
parent fc2257489e
commit b5c094d951
2 changed files with 17 additions and 6 deletions

View File

@ -207,6 +207,11 @@ public class TaskQueue
} }
} }
public boolean isActive()
{
return active;
}
/** /**
* Main task runner management loop. Meant to run forever, or, at least until we're stopped. * Main task runner management loop. Meant to run forever, or, at least until we're stopped.
*/ */

View File

@ -430,7 +430,7 @@ public class TaskLifecycleTest
}; };
} }
}; };
setUpAndStartTaskQueue( setUpTaskQueue(
new DataSegmentPusher() new DataSegmentPusher()
{ {
@Override @Override
@ -449,7 +449,7 @@ public class TaskLifecycleTest
); );
} }
private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) private void setUpTaskQueue(DataSegmentPusher dataSegmentPusher) throws Exception
{ {
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
tsqa = new TaskStorageQueryAdapter(ts); tsqa = new TaskStorageQueryAdapter(ts);
@ -536,13 +536,14 @@ public class TaskLifecycleTest
); );
tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter); tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
tq.start();
} }
@After @After
public void tearDown() public void tearDown()
{ {
tq.stop(); if (tq.isActive()) {
tq.stop();
}
} }
@Test @Test
@ -864,7 +865,7 @@ public class TaskLifecycleTest
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
} }
@Test(timeout = 10000L) @Test(timeout = 60_000L)
public void testRealtimeIndexTask() throws Exception public void testRealtimeIndexTask() throws Exception
{ {
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
@ -876,6 +877,7 @@ public class TaskLifecycleTest
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
final String taskId = realtimeIndexTask.getId(); final String taskId = realtimeIndexTask.getId();
tq.start();
tq.add(realtimeIndexTask); tq.add(realtimeIndexTask);
//wait for task to process events and publish segment //wait for task to process events and publish segment
publishCountDown.await(); publishCountDown.await();
@ -910,7 +912,7 @@ public class TaskLifecycleTest
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testRealtimeIndexTaskFailure() throws Exception public void testRealtimeIndexTaskFailure() throws Exception
{ {
setUpAndStartTaskQueue( setUpTaskQueue(
new DataSegmentPusher() new DataSegmentPusher()
{ {
@Override @Override
@ -934,6 +936,8 @@ public class TaskLifecycleTest
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask(); RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
final String taskId = realtimeIndexTask.getId(); final String taskId = realtimeIndexTask.getId();
tq.start();
tq.add(realtimeIndexTask); tq.add(realtimeIndexTask);
// Wait for realtime index task to fail // Wait for realtime index task to fail
@ -974,6 +978,7 @@ public class TaskLifecycleTest
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
// manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage // manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage
tq.start();
ts.insert(indexTask, TaskStatus.running(indexTask.getId())); ts.insert(indexTask, TaskStatus.running(indexTask.getId()));
while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) { while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) {
@ -1022,6 +1027,7 @@ public class TaskLifecycleTest
Preconditions.checkArgument(!task.getId().equals(dummyTask.getId())); Preconditions.checkArgument(!task.getId().equals(dummyTask.getId()));
tq.start();
tq.add(dummyTask); tq.add(dummyTask);
tq.add(task); tq.add(task);