mirror of https://github.com/apache/druid.git
commit
8451f21fed
|
@ -207,6 +207,11 @@ public class TaskQueue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isActive()
|
||||||
|
{
|
||||||
|
return active;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main task runner management loop. Meant to run forever, or, at least until we're stopped.
|
* Main task runner management loop. Meant to run forever, or, at least until we're stopped.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -430,7 +430,7 @@ public class TaskLifecycleTest
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
setUpAndStartTaskQueue(
|
setUpTaskQueue(
|
||||||
new DataSegmentPusher()
|
new DataSegmentPusher()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -449,7 +449,7 @@ public class TaskLifecycleTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher)
|
private void setUpTaskQueue(DataSegmentPusher dataSegmentPusher) throws Exception
|
||||||
{
|
{
|
||||||
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
|
final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
|
||||||
tsqa = new TaskStorageQueryAdapter(ts);
|
tsqa = new TaskStorageQueryAdapter(ts);
|
||||||
|
@ -536,13 +536,14 @@ public class TaskLifecycleTest
|
||||||
);
|
);
|
||||||
tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter);
|
tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter);
|
||||||
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
|
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
|
||||||
tq.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown()
|
public void tearDown()
|
||||||
{
|
{
|
||||||
tq.stop();
|
if (tq.isActive()) {
|
||||||
|
tq.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -864,7 +865,7 @@ public class TaskLifecycleTest
|
||||||
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
|
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000L)
|
@Test(timeout = 60_000L)
|
||||||
public void testRealtimeIndexTask() throws Exception
|
public void testRealtimeIndexTask() throws Exception
|
||||||
{
|
{
|
||||||
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
|
monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class));
|
||||||
|
@ -876,6 +877,7 @@ public class TaskLifecycleTest
|
||||||
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
|
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
|
||||||
final String taskId = realtimeIndexTask.getId();
|
final String taskId = realtimeIndexTask.getId();
|
||||||
|
|
||||||
|
tq.start();
|
||||||
tq.add(realtimeIndexTask);
|
tq.add(realtimeIndexTask);
|
||||||
//wait for task to process events and publish segment
|
//wait for task to process events and publish segment
|
||||||
publishCountDown.await();
|
publishCountDown.await();
|
||||||
|
@ -910,7 +912,7 @@ public class TaskLifecycleTest
|
||||||
@Test(timeout = 60_000L)
|
@Test(timeout = 60_000L)
|
||||||
public void testRealtimeIndexTaskFailure() throws Exception
|
public void testRealtimeIndexTaskFailure() throws Exception
|
||||||
{
|
{
|
||||||
setUpAndStartTaskQueue(
|
setUpTaskQueue(
|
||||||
new DataSegmentPusher()
|
new DataSegmentPusher()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -934,6 +936,8 @@ public class TaskLifecycleTest
|
||||||
|
|
||||||
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
|
RealtimeIndexTask realtimeIndexTask = giveMeARealtimeIndexTask();
|
||||||
final String taskId = realtimeIndexTask.getId();
|
final String taskId = realtimeIndexTask.getId();
|
||||||
|
|
||||||
|
tq.start();
|
||||||
tq.add(realtimeIndexTask);
|
tq.add(realtimeIndexTask);
|
||||||
|
|
||||||
// Wait for realtime index task to fail
|
// Wait for realtime index task to fail
|
||||||
|
@ -974,6 +978,7 @@ public class TaskLifecycleTest
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
// manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage
|
// manually insert the task into TaskStorage, waiting for TaskQueue to sync from storage
|
||||||
|
tq.start();
|
||||||
ts.insert(indexTask, TaskStatus.running(indexTask.getId()));
|
ts.insert(indexTask, TaskStatus.running(indexTask.getId()));
|
||||||
|
|
||||||
while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) {
|
while (tsqa.getStatus(indexTask.getId()).get().isRunnable()) {
|
||||||
|
@ -1022,6 +1027,7 @@ public class TaskLifecycleTest
|
||||||
|
|
||||||
Preconditions.checkArgument(!task.getId().equals(dummyTask.getId()));
|
Preconditions.checkArgument(!task.getId().equals(dummyTask.getId()));
|
||||||
|
|
||||||
|
tq.start();
|
||||||
tq.add(dummyTask);
|
tq.add(dummyTask);
|
||||||
tq.add(task);
|
tq.add(task);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue