diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java index e30c5ec31d5..2bd4870ff2c 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuleRunner.java @@ -33,14 +33,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper { private static final EmittingLogger log = new EmittingLogger(DruidMasterRuleRunner.class); - private final ReplicationThrottler replicationManager; + private final ReplicationThrottler replicatorThrottler; private final DruidMaster master; public DruidMasterRuleRunner(DruidMaster master, int replicantLifeTime, int replicantThrottleLimit) { this.master = master; - this.replicationManager = new ReplicationThrottler(replicantThrottleLimit, replicantLifeTime); + this.replicatorThrottler = new ReplicationThrottler(replicantThrottleLimit, replicantLifeTime); } @Override @@ -55,12 +55,12 @@ public class DruidMasterRuleRunner implements DruidMasterHelper } for (String tier : cluster.getTierNames()) { - replicationManager.updateReplicationState(tier); - replicationManager.updateTerminationState(tier); + replicatorThrottler.updateReplicationState(tier); + replicatorThrottler.updateTerminationState(tier); } DruidMasterRuntimeParams paramsWithReplicationManager = params.buildFromExisting() - .withReplicationManager(replicationManager) + .withReplicationManager(replicatorThrottler) .build(); // Run through all matched rules for available segments diff --git a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java index e899ec919bc..3b3e90f78c2 100644 --- a/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java +++ b/server/src/main/java/com/metamx/druid/master/ReplicationThrottler.java @@ -76,6 +76,7 @@ public class ReplicationThrottler } else { log.info("[%s]: Replicant %s queue is empty.", tier, type); lookup.put(tier, true); + holder.resetLifetime(tier); } } @@ -162,5 +163,10 @@ public class ReplicationThrottler } lifetimes.put(tier, --lifetime); } + + public void resetLifetime(String tier) + { + lifetimes.put(tier, maxLifetime); + } } }