diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 951d38ddf46..389afec54c1 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -247,6 +247,16 @@ public class DruidCoordinator return retVal; } + CountingMap getLoadPendingDatasources() { + final CountingMap retVal = new CountingMap<>(); + for (LoadQueuePeon peon : loadManagementPeons.values()) { + for (DataSegment segment : peon.getSegmentsToLoad()) { + retVal.add(segment.getDataSource(), 1); + } + } + return retVal; + } + public Map getLoadStatus() { Map loadStatus = Maps.newHashMap(); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 5d1e315fb0e..7408afded84 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -338,6 +338,10 @@ public class DruidCoordinatorTest extends CuratorTestBase Assert.assertEquals(1, segmentAvailability.size()); Assert.assertEquals(0l, segmentAvailability.get(dataSource)); + while (coordinator.getLoadPendingDatasources().get(dataSource).get() > 0) { + Thread.sleep(50); + } + Map> replicationStatus = coordinator.getReplicationStatus(); Assert.assertNotNull(replicationStatus); Assert.assertEquals(1, replicationStatus.entrySet().size());