From 6e33cd4e0adc340e352f80636d34bdc89054825a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 6 Aug 2014 12:52:09 -0700 Subject: [PATCH] Coordinator drop/remove related stuff. - DruidCoordinatorCleanup should wait for availableSegments to populate before dropping any segments. - Clarify that millisToWaitBeforeDeleting applies to "removing" rather than "dropping" segments. - LoadRule shouldn't need to wait for the deletionWaitTime before dropping excess replicas. --- docs/content/Coordinator-Config.md | 2 +- .../helper/DruidCoordinatorCleanup.java | 40 ++++++++++--------- .../server/coordinator/rules/LoadRule.java | 10 ++--- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/docs/content/Coordinator-Config.md b/docs/content/Coordinator-Config.md index 1221b043f6e..d1bc8fb49b0 100644 --- a/docs/content/Coordinator-Config.md +++ b/docs/content/Coordinator-Config.md @@ -51,7 +51,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |Property|Description|Default| |--------|-----------|-------| -|`millisToWaitBeforeDeleting`|How long does the coordinator need to be active before it can start deleting segments.|90000 (15 mins)| +|`millisToWaitBeforeDeleting`|How long does the coordinator need to be active before it can start removing (marking unused) segments in metadata storage.|900000 (15 mins)| |`mergeBytesLimit`|The maximum number of bytes to merge (for segments).|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [merge task](Tasks.html).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java index e7684c80519..18c82f183c5 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanup.java @@ -63,27 +63,31 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper Set availableSegments = params.getAvailableSegments(); DruidCluster cluster = params.getDruidCluster(); - // Drop segments that no longer exist in the available segments configuration - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { - for (ServerHolder serverHolder : serverHolders) { - ImmutableDruidServer server = serverHolder.getServer(); + // Drop segments that no longer exist in the available segments configuration, if it has been populated. (It might + // not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any + // segments at all, we should have all of them.) + if (!availableSegments.isEmpty()) { + for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (ServerHolder serverHolder : serverHolders) { + ImmutableDruidServer server = serverHolder.getServer(); - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - for (DataSegment segment : dataSource.getSegments()) { - if (!availableSegments.contains(segment)) { - LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); + for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + for (DataSegment segment : dataSource.getSegments()) { + if (!availableSegments.contains(segment)) { + LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); - if (!queuePeon.getSegmentsToDrop().contains(segment)) { - queuePeon.dropSegment( - segment, new LoadPeonCallback() - { - @Override - public void execute() - { - } + if (!queuePeon.getSegmentsToDrop().contains(segment)) { + queuePeon.dropSegment( + segment, new LoadPeonCallback() + { + @Override + public void execute() + { + } + } + ); + stats.addToTieredStat("unneededCount", server.getTier(), 1); } - ); - stats.addToTieredStat("unneededCount", server.getTier(), 1); } } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index cac7a5b8c28..ea917a1b438 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -35,6 +35,7 @@ import org.joda.time.DateTime; import java.util.List; import java.util.Map; +import java.util.Set; /** * LoadRules indicate the number of replicants a segment should have in a given tier. @@ -48,7 +49,8 @@ public abstract class LoadRule implements Rule @Override public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment) { - CoordinatorStats stats = new CoordinatorStats(); + final CoordinatorStats stats = new CoordinatorStats(); + final Set availableSegments = params.getAvailableSegments(); final Map loadStatus = Maps.newHashMap(); @@ -70,7 +72,7 @@ public abstract class LoadRule implements Rule final List serverHolderList = Lists.newArrayList(serverQueue); final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); - if (params.getAvailableSegments().contains(segment)) { + if (availableSegments.contains(segment)) { CoordinatorStats assignStats = assign( params.getReplicationManager(), tier, @@ -167,10 +169,6 @@ public abstract class LoadRule implements Rule { CoordinatorStats stats = new CoordinatorStats(); - if (!params.hasDeletionWaitTimeElapsed()) { - return stats; - } - // Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything for (Integer leftToLoad : loadStatus.values()) { if (leftToLoad > 0) {