diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index 9ca7fd4c091..a7b3b28358c 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -24,10 +24,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.druid.client.DataSegment; import com.metamx.druid.master.DruidMaster; -import com.metamx.druid.master.ReplicationThrottler; import com.metamx.druid.master.DruidMasterRuntimeParams; import com.metamx.druid.master.LoadPeonCallback; import com.metamx.druid.master.MasterStats; +import com.metamx.druid.master.ReplicationThrottler; import com.metamx.druid.master.ServerHolder; import com.metamx.emitter.EmittingLogger; @@ -74,13 +74,6 @@ public abstract class LoadRule implements Rule List assignedServers = Lists.newArrayList(); while (totalReplicants < expectedReplicants) { - if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster - if (!replicationManager.canAddReplicant(getTier()) || - !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) { - break; - } - } - ServerHolder holder = serverQueue.pollFirst(); if (holder == null) { log.warn( @@ -118,6 +111,14 @@ public abstract class LoadRule implements Rule break; } + if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster + if (!replicationManager.canAddReplicant(getTier()) || + !replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) { + serverQueue.add(holder); + break; + } + } + holder.getPeon().loadSegment( segment, new LoadPeonCallback() @@ -173,19 +174,20 @@ public abstract class LoadRule implements Rule List droppedServers = Lists.newArrayList(); while (actualNumReplicantsForType > expectedNumReplicantsForType) { - if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants - if (!replicationManager.canDestroyReplicant(getTier()) || - !replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) { - break; - } - } - ServerHolder holder = serverQueue.pollLast(); if (holder == null) { log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); break; } + if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants + if (!replicationManager.canDestroyReplicant(getTier()) || + !replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) { + serverQueue.add(holder); + break; + } + } + if (holder.isServingSegment(segment)) { holder.getPeon().dropSegment( segment,