diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 0280045c3a9..d8d49b1a2ba 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -691,58 +691,63 @@ public class DruidCoordinator super( ImmutableList.of( new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this), - params -> { - // Display info about all historical servers - Iterable servers = FunctionalIterable - .create(serverInventoryView.getInventory()) - .filter(DruidServer::segmentReplicatable) - .transform(DruidServer::toImmutableDruidServer); + new DruidCoordinatorHelper() + { + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + // Display info about all historical servers + Iterable servers = FunctionalIterable + .create(serverInventoryView.getInventory()) + .filter(DruidServer::segmentReplicatable) + .transform(DruidServer::toImmutableDruidServer); - if (log.isDebugEnabled()) { - log.debug("Servers"); - for (ImmutableDruidServer druidServer : servers) { - log.debug(" %s", druidServer); - log.debug(" -- DataSources"); - for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { - log.debug(" %s", druidDataSource); + if (log.isDebugEnabled()) { + log.debug("Servers"); + for (ImmutableDruidServer druidServer : servers) { + log.debug(" %s", druidServer); + log.debug(" -- DataSources"); + for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { + log.debug(" %s", druidDataSource); + } } } - } - // Find all historical servers, group them by subType and sort by ascending usage - final DruidCluster cluster = new DruidCluster(); - for (ImmutableDruidServer server : servers) { - if (!loadManagementPeons.containsKey(server.getName())) { - LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); - loadQueuePeon.start(); - log.info("Created LoadQueuePeon for server[%s].", server.getName()); + // Find all historical servers, group them by subType and sort by ascending usage + final DruidCluster cluster = new DruidCluster(); + for (ImmutableDruidServer server : servers) { + if (!loadManagementPeons.containsKey(server.getName())) { + LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); + loadQueuePeon.start(); + log.info("Created LoadQueuePeon for server[%s].", server.getName()); - loadManagementPeons.put(server.getName(), loadQueuePeon); + loadManagementPeons.put(server.getName(), loadQueuePeon); + } + + cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName()))); } - cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName()))); - } + segmentReplicantLookup = SegmentReplicantLookup.make(cluster); - segmentReplicantLookup = SegmentReplicantLookup.make(cluster); + // Stop peons for servers that aren't there anymore. + final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); + for (ImmutableDruidServer server : servers) { + disappeared.remove(server.getName()); + } + for (String name : disappeared) { + log.info("Removing listener for server[%s] which is no longer there.", name); + LoadQueuePeon peon = loadManagementPeons.remove(name); + peon.stop(); + } - // Stop peons for servers that aren't there anymore. - final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); - for (ImmutableDruidServer server : servers) { - disappeared.remove(server.getName()); + return params.buildFromExisting() + .withDruidCluster(cluster) + .withDatabaseRuleManager(metadataRuleManager) + .withLoadManagementPeons(loadManagementPeons) + .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(DateTimes.nowUtc()) + .build(); } - for (String name : disappeared) { - log.info("Removing listener for server[%s] which is no longer there.", name); - LoadQueuePeon peon = loadManagementPeons.remove(name); - peon.stop(); - } - - return params.buildFromExisting() - .withDruidCluster(cluster) - .withDatabaseRuleManager(metadataRuleManager) - .withLoadManagementPeons(loadManagementPeons) - .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerReferenceTimestamp(DateTimes.nowUtc()) - .build(); }, new DruidCoordinatorRuleRunner(DruidCoordinator.this), new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),