mirror of https://github.com/apache/druid.git
Improve "waiting for tasks complete" logic in integration tests (#9759)
* improve waiting for tasks complete logic in integration tests * improve waiting for tasks complete logic in integration tests * fix forbidden check
This commit is contained in:
parent
a107ee3ed2
commit
6bc64b731f
|
@ -282,7 +282,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
|
|||
private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception
|
||||
{
|
||||
compactionResource.forceTriggerAutoCompaction();
|
||||
waitForAllTasksToComplete();
|
||||
waitForAllTasksToCompleteForDataSource(fullDatasourceName);
|
||||
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.druid.guice.annotations.Json;
|
||||
import org.apache.druid.guice.annotations.Smile;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
|
||||
import org.apache.druid.testing.clients.OverlordResourceTestClient;
|
||||
|
@ -83,7 +84,7 @@ public abstract class AbstractIndexerTest
|
|||
{
|
||||
// Wait for any existing index tasks to complete before disabling the datasource otherwise
|
||||
// realtime tasks can get stuck waiting for handoff. https://github.com/apache/druid/issues/1729
|
||||
waitForAllTasksToComplete();
|
||||
waitForAllTasksToCompleteForDataSource(dataSource);
|
||||
Interval interval = Intervals.of(start + "/" + end);
|
||||
coordinator.unloadSegmentsForDataSource(dataSource);
|
||||
ITRetryUtil.retryUntilFalse(
|
||||
|
@ -97,19 +98,14 @@ public abstract class AbstractIndexerTest
|
|||
}, "Segment Unloading"
|
||||
);
|
||||
coordinator.deleteSegmentsDataSource(dataSource, interval);
|
||||
waitForAllTasksToComplete();
|
||||
waitForAllTasksToCompleteForDataSource(dataSource);
|
||||
}
|
||||
|
||||
protected void waitForAllTasksToComplete()
|
||||
protected void waitForAllTasksToCompleteForDataSource(final String dataSource)
|
||||
{
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> {
|
||||
int numTasks = indexer.getPendingTasks().size() +
|
||||
indexer.getRunningTasks().size() +
|
||||
indexer.getWaitingTasks().size();
|
||||
return numTasks == 0;
|
||||
},
|
||||
"Waiting for Tasks Completion"
|
||||
() -> (indexer.getUncompletedTasksForDataSource(dataSource).size() == 0),
|
||||
StringUtils.format("Waiting for all tasks of [%s] to complete", dataSource)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue