Wait for any pending realtime task to complete before disabling datasource (#3757)

Noticed this in our internal testing, sometimes realtime index tasks in
kafkaIndexing service can get stuck waiting for handoff if datasource
is disabled before there task completion.
This is a workaround to ensure integration tests do not hit this case
until https://github.com/druid-io/druid/issues/1729 is fixed.
This commit is contained in:
Nishant 2016-12-07 23:47:16 +05:30 committed by Fangjin Yang
parent 06d0ef9c6c
commit 361af4c94f
1 changed files with 15 additions and 8 deletions

View File

@ -69,6 +69,9 @@ public abstract class AbstractIndexerTest
protected void unloadAndKillData(final String dataSource, String start, String end) throws Exception protected void unloadAndKillData(final String dataSource, String start, String end) throws Exception
{ {
// Wait for any existing index tasks to complete before disabling the datasource otherwise
// realtime tasks can get stuck waiting for handoff. https://github.com/druid-io/druid/issues/1729
waitForAllTasksToComplete();
Interval interval = new Interval(start + "/" + end); Interval interval = new Interval(start + "/" + end);
coordinator.unloadSegmentsForDataSource(dataSource, interval); coordinator.unloadSegmentsForDataSource(dataSource, interval);
RetryUtil.retryUntilFalse( RetryUtil.retryUntilFalse(
@ -82,16 +85,20 @@ public abstract class AbstractIndexerTest
}, "Segment Unloading" }, "Segment Unloading"
); );
coordinator.deleteSegmentsDataSource(dataSource, interval); coordinator.deleteSegmentsDataSource(dataSource, interval);
waitForAllTasksToComplete();
}
protected void waitForAllTasksToComplete(){
RetryUtil.retryUntilTrue( RetryUtil.retryUntilTrue(
new Callable<Boolean>() new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{ {
@Override return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks()
public Boolean call() throws Exception .size()) == 0;
{ }
return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks() }, "Waiting for Tasks Completion"
.size()) == 0;
}
}, "Waiting for Tasks Completion"
); );
} }