fix bug in throttle replicants

This commit is contained in:
Fangjin Yang 2013-02-06 18:40:29 -08:00
parent e09c658352
commit 267c797023
1 changed files with 17 additions and 15 deletions

View File

@ -24,10 +24,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.master.MasterStats;
import com.metamx.druid.master.ReplicationThrottler;
import com.metamx.druid.master.ServerHolder;
import com.metamx.emitter.EmittingLogger;
@ -74,13 +74,6 @@ public abstract class LoadRule implements Rule
List<ServerHolder> assignedServers = Lists.newArrayList();
while (totalReplicants < expectedReplicants) {
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
break;
}
}
ServerHolder holder = serverQueue.pollFirst();
if (holder == null) {
log.warn(
@ -118,6 +111,14 @@ public abstract class LoadRule implements Rule
break;
}
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
if (!replicationManager.canAddReplicant(getTier()) ||
!replicationManager.registerReplicantCreation(getTier(), segment.getIdentifier())) {
serverQueue.add(holder);
break;
}
}
holder.getPeon().loadSegment(
segment,
new LoadPeonCallback()
@ -173,19 +174,20 @@ public abstract class LoadRule implements Rule
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
break;
}
}
ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier()) ||
!replicationManager.registerReplicantTermination(getTier(), segment.getIdentifier())) {
serverQueue.add(holder);
break;
}
}
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment(
segment,