diff --git a/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java index 51bf161f5f0..d794b82cea5 100644 --- a/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -174,7 +174,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon } } - log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); + log.debug("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); queuedSize.addAndGet(segment.getSize()); segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback))); } @@ -205,7 +205,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon } } - log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); + log.debug("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback))); } @@ -244,16 +244,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon } try { - if (currentlyProcessing == null) { - if (!stopped) { - log.makeAlert("Crazy race condition! server[%s]", basePath) - .emit(); - } - actionCompleted(); - return; - } - - log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier()); final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier()); final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); @@ -397,7 +387,12 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon return; } actionCompleted(); - log.info("Server[%s] done processing [%s]", basePath, path); + log.info( + "Server[%s] done processing %s of segment [%s]", + basePath, + currentlyProcessing.getType() == LOAD ? "load" : "drop", + path + ); } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index a4085048c22..f507e332e97 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -696,63 +696,58 @@ public class DruidCoordinator super( ImmutableList.of( new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this), - new DruidCoordinatorHelper() - { - @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) - { - // Display info about all historical servers - Iterable servers = FunctionalIterable - .create(serverInventoryView.getInventory()) - .filter(DruidServer::segmentReplicatable) - .transform(DruidServer::toImmutableDruidServer); + params -> { + // Display info about all historical servers + Iterable servers = FunctionalIterable + .create(serverInventoryView.getInventory()) + .filter(DruidServer::segmentReplicatable) + .transform(DruidServer::toImmutableDruidServer); - if (log.isDebugEnabled()) { - log.debug("Servers"); - for (ImmutableDruidServer druidServer : servers) { - log.debug(" %s", druidServer); - log.debug(" -- DataSources"); - for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { - log.debug(" %s", druidDataSource); - } + if (log.isDebugEnabled()) { + log.debug("Servers"); + for (ImmutableDruidServer druidServer : servers) { + log.debug(" %s", druidServer); + log.debug(" -- DataSources"); + for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { + log.debug(" %s", druidDataSource); } } - - // Find all historical servers, group them by subType and sort by ascending usage - final DruidCluster cluster = new DruidCluster(); - for (ImmutableDruidServer server : servers) { - if (!loadManagementPeons.containsKey(server.getName())) { - LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); - loadQueuePeon.start(); - log.info("Created LoadQueuePeon for server[%s].", server.getName()); - - loadManagementPeons.put(server.getName(), loadQueuePeon); - } - - cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName()))); - } - - segmentReplicantLookup = SegmentReplicantLookup.make(cluster); - - // Stop peons for servers that aren't there anymore. - final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); - for (ImmutableDruidServer server : servers) { - disappeared.remove(server.getName()); - } - for (String name : disappeared) { - log.info("Removing listener for server[%s] which is no longer there.", name); - LoadQueuePeon peon = loadManagementPeons.remove(name); - peon.stop(); - } - - return params.buildFromExisting() - .withDruidCluster(cluster) - .withDatabaseRuleManager(metadataRuleManager) - .withLoadManagementPeons(loadManagementPeons) - .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerReferenceTimestamp(DateTimes.nowUtc()) - .build(); } + + // Find all historical servers, group them by subType and sort by ascending usage + final DruidCluster cluster = new DruidCluster(); + for (ImmutableDruidServer server : servers) { + if (!loadManagementPeons.containsKey(server.getName())) { + LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); + loadQueuePeon.start(); + log.info("Created LoadQueuePeon for server[%s].", server.getName()); + + loadManagementPeons.put(server.getName(), loadQueuePeon); + } + + cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName()))); + } + + segmentReplicantLookup = SegmentReplicantLookup.make(cluster); + + // Stop peons for servers that aren't there anymore. + final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); + for (ImmutableDruidServer server : servers) { + disappeared.remove(server.getName()); + } + for (String name : disappeared) { + log.info("Removing listener for server[%s] which is no longer there.", name); + LoadQueuePeon peon = loadManagementPeons.remove(name); + peon.stop(); + } + + return params.buildFromExisting() + .withDruidCluster(cluster) + .withDatabaseRuleManager(metadataRuleManager) + .withLoadManagementPeons(loadManagementPeons) + .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(DateTimes.nowUtc()) + .build(); }, new DruidCoordinatorRuleRunner(DruidCoordinator.this), new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this), diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java index ece1d4884fa..1040d4d3e75 100644 --- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -390,7 +390,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon SegmentHolder holder = segmentsToLoad.get(segment); if (holder == null) { - log.info("Server[%s] to load segment[%s] queued.", serverId, segment.getIdentifier()); + log.debug("Server[%s] to load segment[%s] queued.", serverId, segment.getIdentifier()); segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback)); processingExecutor.execute(this::doSegmentManagement); } else { @@ -415,7 +415,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon SegmentHolder holder = segmentsToDrop.get(segment); if (holder == null) { - log.info("Server[%s] to drop segment[%s] queued.", serverId, segment.getIdentifier()); + log.debug("Server[%s] to drop segment[%s] queued.", serverId, segment.getIdentifier()); segmentsToDrop.put(segment, new DropSegmentHolder(segment, callback)); processingExecutor.execute(this::doSegmentManagement); } else { diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java index 5471f3116aa..2eeef2b3cb7 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java @@ -26,7 +26,6 @@ import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; -import io.druid.server.coordinator.LoadPeonCallback; import io.druid.server.coordinator.LoadQueuePeon; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; @@ -74,15 +73,16 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper if (!queuePeon.getSegmentsToDrop().contains(segment)) { queuePeon.dropSegment( - segment, new LoadPeonCallback() - { - @Override - public void execute() - { - } + segment, () -> { } ); stats.addToTieredStat("unneededCount", server.getTier(), 1); + log.info( + "Dropping uneeded segment [%s] from server [%s] in tier [%s]", + segment.getIdentifier(), + server.getName(), + server.getTier() + ); } } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index b8ea703c7a6..9dcee89bece 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -20,6 +20,7 @@ package io.druid.server.coordinator.rules; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.emitter.EmittingLogger; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; @@ -174,6 +175,13 @@ public abstract class LoadRule implements Rule } final String tier = entry.getKey(); + String noAvailability = StringUtils.format( + "No available [%s] servers or node capacity to assign primary segment[%s]! Expected Replicants[%d]", + tier, + segment.getIdentifier(), + targetReplicantsInTier + ); + final List holders = getFilteredHolders( tier, params.getDruidCluster(), @@ -181,16 +189,13 @@ public abstract class LoadRule implements Rule ); // no holders satisfy the predicate if (holders.isEmpty()) { + log.warn(noAvailability); continue; } final ServerHolder candidate = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders); if (candidate == null) { - log.warn( - "No available [%s] servers or node capacity to assign primary segment[%s]! " + - "Expected Replicants[%d]", - tier, segment.getIdentifier(), targetReplicantsInTier - ); + log.warn(noAvailability); } else { // cache the result for later use. strategyCache.put(tier, candidate); @@ -204,6 +209,12 @@ public abstract class LoadRule implements Rule if (topCandidate != null) { // remove tier for primary replica strategyCache.remove(topCandidate.getServer().getTier()); + log.info( + "Assigning 'primary' for segment [%s] to server [%s] in tier [%s]", + segment.getIdentifier(), + topCandidate.getServer().getName(), + topCandidate.getServer().getTier() + ); topCandidate.getPeon().loadSegment(segment, null); } @@ -225,6 +236,7 @@ public abstract class LoadRule implements Rule for (final Object2IntMap.Entry entry : targetReplicants.object2IntEntrySet()) { final String tier = entry.getKey(); if (tier.equals(tierToSkip)) { + log.info("Skipping replica assignment for tier [%s]", tier); continue; } final int numAssigned = assignReplicasForTier( @@ -235,6 +247,7 @@ public abstract class LoadRule implements Rule createLoadQueueSizeLimitingPredicate(params), segment ); + log.info("Assigned %d replicas in tier [%s]", numAssigned, tier); stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); } } @@ -257,15 +270,24 @@ public abstract class LoadRule implements Rule return 0; } + String noAvailability = StringUtils.format( + "No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", + tier, + segment.getIdentifier(), + targetReplicantsInTier + ); + final List holders = getFilteredHolders(tier, params.getDruidCluster(), predicate); // if no holders available for assignment if (holders.isEmpty()) { + log.warn(noAvailability); return 0; } final ReplicationThrottler throttler = params.getReplicationManager(); for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) { if (!throttler.canCreateReplicant(tier)) { + log.info("Throttling replication for segment [%s] in tier [%s]", segment.getIdentifier(), tier); return numAssigned; } @@ -277,10 +299,7 @@ public abstract class LoadRule implements Rule } if (holder == null) { - log.warn( - "No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", - tier, segment.getIdentifier(), targetReplicantsInTier - ); + log.warn(noAvailability); return numAssigned; } holders.remove(holder); @@ -288,6 +307,12 @@ public abstract class LoadRule implements Rule final String segmentId = segment.getIdentifier(); final String holderHost = holder.getServer().getHost(); throttler.registerReplicantCreation(tier, segmentId, holderHost); + log.info( + "Assigning 'replica' for segment [%s] to server [%s] in tier [%s]", + segment.getIdentifier(), + holder.getServer().getName(), + holder.getServer().getTier() + ); holder.getPeon().loadSegment(segment, () -> throttler.unregisterReplicantCreation(tier, segmentId, holderHost)); } @@ -307,6 +332,7 @@ public abstract class LoadRule implements Rule // This enforces that loading is completed before we attempt to drop stuffs as a safety measure. if (loadingInProgress(druidCluster)) { + log.info("Loading in progress, skipping drop until loading is complete"); return; } @@ -372,6 +398,12 @@ public abstract class LoadRule implements Rule final ServerHolder holder = iterator.next(); if (holder.isServingSegment(segment)) { + log.info( + "Dropping segment [%s] on server [%s] in tier [%s]", + segment.getIdentifier(), + holder.getServer().getName(), + holder.getServer().getTier() + ); holder.getPeon().dropSegment(segment, null); ++numDropped; } else {