diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 80172c432fa..aa4130b4257 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledExecutorService; /** @@ -622,6 +623,7 @@ public class DruidMaster // Find all historical servers, group them by subType and sort by ascending usage final DruidCluster cluster = new DruidCluster(); + final Map> currentlyReplicatingSegments; for (DruidServer server : servers) { if (!loadManagementPeons.containsKey(server.getName())) { String basePath = yp.combineParts(Arrays.asList(config.getLoadQueuePath(), server.getName())); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterReplicationManager.java b/server/src/main/java/com/metamx/druid/master/DruidMasterReplicationManager.java new file mode 100644 index 00000000000..1ee479933b9 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterReplicationManager.java @@ -0,0 +1,148 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.master; + +import com.google.common.collect.Maps; +import com.metamx.emitter.EmittingLogger; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + * The DruidMasterReplicationManager is used to throttle the number of replicants that are created and destroyed. + */ +public class DruidMasterReplicationManager +{ + private static final EmittingLogger log = new EmittingLogger(DruidMasterReplicationManager.class); + private final int maxReplicants; + private final int maxLifetime; + + private final Map replicatingLookup = Maps.newHashMap(); + private final Map terminatingLookup = Maps.newHashMap(); + private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder(); + private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder(); + + public DruidMasterReplicationManager(int maxReplicants, int maxLifetime) + { + this.maxReplicants = maxReplicants; + this.maxLifetime = maxLifetime; + } + + public void updateReplicationState(String tier) + { + update(tier, currentlyReplicating, replicatingLookup, "create"); + } + + public void updateTerminationState(String tier) + { + update(tier, currentlyTerminating, terminatingLookup, "terminate"); + } + + private void update(String tier, ReplicatorSegmentHolder holder, Map lookup, String type) + { + int size = holder.getNumProcessing(tier); + if (size != 0) { + log.info("[%s]: Replicant %s queue still has %d segments", tier, type, size); + holder.reduceLifetime(); + + if (holder.getLifetime() < 0) { + log.makeAlert("[%s]: Replicant %s queue stuck after %d+ runs!", tier, type, maxReplicants).emit(); + } + lookup.put(tier, false); + } + + lookup.put(tier, true); + } + + public boolean canAddReplicant(String tier) + { + return replicatingLookup.get(tier); + } + + public boolean canDestroyReplicant(String tier) + { + return terminatingLookup.get(tier); + } + + public boolean registerReplicantCreation(String tier, String segmentId) + { + return currentlyReplicating.addSegment(tier, segmentId); + } + + public void unregisterReplicantCreation(String tier, String segmentId) + { + currentlyReplicating.removeSegment(tier, segmentId); + } + + public boolean registerReplicantTermination(String tier, String segmentId) + { + return currentlyTerminating.addSegment(tier, segmentId); + } + + public void unregisterReplicantTermination(String tier, String segmentId) + { + currentlyTerminating.removeSegment(tier, segmentId); + } + + private class ReplicatorSegmentHolder + { + private final Map> currentlyProcessingSegments = Maps.newHashMap(); + private volatile int lifetime = maxLifetime; + + public boolean addSegment(String tier, String segmentId) + { + if (currentlyProcessingSegments.size() < maxReplicants) { + Set segments = currentlyProcessingSegments.get(tier); + if (segments == null) { + segments = new ConcurrentSkipListSet(); + } + segments.add(segmentId); + return true; + } + + return false; + } + + public void removeSegment(String tier, String segmentId) + { + Set segments = currentlyProcessingSegments.get(tier); + if (segments != null) { + segments.remove(segmentId); + } + } + + public int getNumProcessing(String tier) + { + Set segments = currentlyProcessingSegments.get(tier); + return (segments == null) ? 0 : currentlyProcessingSegments.size(); + } + + public int getLifetime() + { + return lifetime; + } + + public void reduceLifetime() + { + lifetime--; + } + } +} diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java index 5c33a257f26..96e181ce9b9 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java @@ -33,6 +33,8 @@ public class DruidMasterRuleRunner implements DruidMasterHelper { private static final EmittingLogger log = new EmittingLogger(DruidMasterRuleRunner.class); + private final DruidMasterReplicationManager replicationManager = new DruidMasterReplicationManager(10, 15); + private final DruidMaster master; public DruidMasterRuleRunner(DruidMaster master) @@ -51,6 +53,15 @@ public class DruidMasterRuleRunner implements DruidMasterHelper return params; } + for (String tier : params.getDruidCluster().getTierNames()) { + replicationManager.updateCreationState(tier); + replicationManager.updateTerminationState(tier); + } + + DruidMasterRuntimeParams paramsWithReplicationManager = params.buildFromExisting() + .withReplicationManager(replicationManager) + .build(); + // Run through all matched rules for available segments DateTime now = new DateTime(); DatabaseRuleManager databaseRuleManager = params.getDatabaseRuleManager(); @@ -76,8 +87,8 @@ public class DruidMasterRuleRunner implements DruidMasterHelper } } - return params.buildFromExisting() - .withMasterStats(stats) - .build(); + return paramsWithReplicationManager.buildFromExisting() + .withMasterStats(stats) + .build(); } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java index a8658937e68..1cd6de64ca3 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java @@ -25,13 +25,13 @@ import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.db.DatabaseRuleManager; -import com.metamx.druid.master.rules.RuleMap; import com.metamx.emitter.service.ServiceEmitter; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; /** */ @@ -44,6 +44,7 @@ public class DruidMasterRuntimeParams private final Set dataSources; private final Set availableSegments; private final Map loadManagementPeons; + private final DruidMasterReplicationManager replicationManager; private final ServiceEmitter emitter; private final long millisToWaitBeforeDeleting; private final MasterStats stats; @@ -58,6 +59,7 @@ public class DruidMasterRuntimeParams Set dataSources, Set availableSegments, Map loadManagementPeons, + DruidMasterReplicationManager replicationManager, ServiceEmitter emitter, long millisToWaitBeforeDeleting, MasterStats stats, @@ -72,6 +74,7 @@ public class DruidMasterRuntimeParams this.dataSources = dataSources; this.availableSegments = availableSegments; this.loadManagementPeons = loadManagementPeons; + this.replicationManager = replicationManager; this.emitter = emitter; this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.stats = stats; @@ -114,6 +117,11 @@ public class DruidMasterRuntimeParams return loadManagementPeons; } + public DruidMasterReplicationManager getReplicationManager() + { + return replicationManager; + } + public ServiceEmitter getEmitter() { return emitter; @@ -159,6 +167,7 @@ public class DruidMasterRuntimeParams dataSources, availableSegments, loadManagementPeons, + replicationManager, emitter, millisToWaitBeforeDeleting, stats, @@ -176,6 +185,7 @@ public class DruidMasterRuntimeParams private final Set dataSources; private final Set availableSegments; private final Map loadManagementPeons; + private DruidMasterReplicationManager replicationManager; private ServiceEmitter emitter; private long millisToWaitBeforeDeleting; private MasterStats stats; @@ -191,6 +201,7 @@ public class DruidMasterRuntimeParams this.dataSources = Sets.newHashSet(); this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator())); this.loadManagementPeons = Maps.newHashMap(); + this.replicationManager = null; this.emitter = null; this.millisToWaitBeforeDeleting = 0; this.stats = new MasterStats(); @@ -206,6 +217,7 @@ public class DruidMasterRuntimeParams Set dataSources, Set availableSegments, Map loadManagementPeons, + DruidMasterReplicationManager replicationManager, ServiceEmitter emitter, long millisToWaitBeforeDeleting, MasterStats stats, @@ -220,6 +232,7 @@ public class DruidMasterRuntimeParams this.dataSources = dataSources; this.availableSegments = availableSegments; this.loadManagementPeons = loadManagementPeons; + this.replicationManager = replicationManager; this.emitter = emitter; this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; this.stats = stats; @@ -237,6 +250,7 @@ public class DruidMasterRuntimeParams dataSources, availableSegments, loadManagementPeons, + replicationManager, emitter, millisToWaitBeforeDeleting, stats, @@ -287,6 +301,12 @@ public class DruidMasterRuntimeParams return this; } + public Builder withReplicationManager(DruidMasterReplicationManager replicationManager) + { + this.replicationManager = replicationManager; + return this; + } + public Builder withEmitter(ServiceEmitter emitter) { this.emitter = emitter; diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 5bf232c9474..488019fffeb 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.druid.client.DataSegment; import com.metamx.druid.master.DruidMaster; +import com.metamx.druid.master.DruidMasterReplicationManager; import com.metamx.druid.master.DruidMasterRuntimeParams; import com.metamx.druid.master.LoadPeonCallback; import com.metamx.druid.master.MasterStats; @@ -55,17 +56,18 @@ public abstract class LoadRule implements Rule return stats; } - stats.accumulate(assign(expectedReplicants, totalReplicants, serverQueue, segment)); + stats.accumulate(assign(params.getReplicationManager(), expectedReplicants, totalReplicants, serverQueue, segment)); stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); return stats; } private MasterStats assign( + final DruidMasterReplicationManager replicationManager, int expectedReplicants, int totalReplicants, MinMaxPriorityQueue serverQueue, - DataSegment segment + final DataSegment segment ) { MasterStats stats = new MasterStats(); @@ -109,6 +111,13 @@ public abstract class LoadRule implements Rule break; } + if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster + if (!replicationManager.canAddReplicant(getTier()) || + !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) { + break; + } + } + holder.getPeon().loadSegment( segment, new LoadPeonCallback() @@ -116,6 +125,7 @@ public abstract class LoadRule implements Rule @Override protected void execute() { + replicationManager.unregisterReplicantCreation(getTier(), segment.getIdentifier()); } } ); @@ -132,11 +142,12 @@ public abstract class LoadRule implements Rule private MasterStats drop( int expectedReplicants, int clusterReplicants, - DataSegment segment, - DruidMasterRuntimeParams params + final DataSegment segment, + final DruidMasterRuntimeParams params ) { MasterStats stats = new MasterStats(); + final DruidMasterReplicationManager replicationManager = params.getReplicationManager(); if (!params.hasDeletionWaitTimeElapsed()) { return stats; @@ -168,6 +179,13 @@ public abstract class LoadRule implements Rule break; } + if (actualNumReplicantsForType > 1) { // don't throttle unless we are removing extra replicants + if (!replicationManager.canDestroyReplicant(getTier()) || + !replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) { + break; + } + } + if (holder.isServingSegment(segment)) { holder.getPeon().dropSegment( segment, @@ -176,6 +194,7 @@ public abstract class LoadRule implements Rule @Override protected void execute() { + replicationManager.unregisterReplicantTermination(getTier(), segment.getIdentifier()); } } );