diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 854c79ee097..3618d84a5a0 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -282,7 +282,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception { compactionResource.forceTriggerAutoCompaction(); - waitForAllTasksToComplete(); + waitForAllTasksToCompleteForDataSource(fullDatasourceName); verifySegmentsCount(numExpectedSegmentsAfterCompaction); ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index 03df90eb99d..6811c32a2d7 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.clients.OverlordResourceTestClient; @@ -83,7 +84,7 @@ public abstract class AbstractIndexerTest { // Wait for any existing index tasks to complete before disabling the datasource otherwise // realtime tasks can get stuck waiting for handoff. https://github.com/apache/druid/issues/1729 - waitForAllTasksToComplete(); + waitForAllTasksToCompleteForDataSource(dataSource); Interval interval = Intervals.of(start + "/" + end); coordinator.unloadSegmentsForDataSource(dataSource); ITRetryUtil.retryUntilFalse( @@ -97,19 +98,14 @@ public abstract class AbstractIndexerTest }, "Segment Unloading" ); coordinator.deleteSegmentsDataSource(dataSource, interval); - waitForAllTasksToComplete(); + waitForAllTasksToCompleteForDataSource(dataSource); } - protected void waitForAllTasksToComplete() + protected void waitForAllTasksToCompleteForDataSource(final String dataSource) { ITRetryUtil.retryUntilTrue( - () -> { - int numTasks = indexer.getPendingTasks().size() + - indexer.getRunningTasks().size() + - indexer.getWaitingTasks().size(); - return numTasks == 0; - }, - "Waiting for Tasks Completion" + () -> (indexer.getUncompletedTasksForDataSource(dataSource).size() == 0), + StringUtils.format("Waiting for all tasks of [%s] to complete", dataSource) ); }