mirror of https://github.com/apache/druid.git
reset lifetimes for the replication throttle
This commit is contained in:
parent
d023d219ae
commit
0d303bee96
|
@ -33,14 +33,14 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
|
||||||
{
|
{
|
||||||
private static final EmittingLogger log = new EmittingLogger(DruidMasterRuleRunner.class);
|
private static final EmittingLogger log = new EmittingLogger(DruidMasterRuleRunner.class);
|
||||||
|
|
||||||
private final ReplicationThrottler replicationManager;
|
private final ReplicationThrottler replicatorThrottler;
|
||||||
|
|
||||||
private final DruidMaster master;
|
private final DruidMaster master;
|
||||||
|
|
||||||
public DruidMasterRuleRunner(DruidMaster master, int replicantLifeTime, int replicantThrottleLimit)
|
public DruidMasterRuleRunner(DruidMaster master, int replicantLifeTime, int replicantThrottleLimit)
|
||||||
{
|
{
|
||||||
this.master = master;
|
this.master = master;
|
||||||
this.replicationManager = new ReplicationThrottler(replicantThrottleLimit, replicantLifeTime);
|
this.replicatorThrottler = new ReplicationThrottler(replicantThrottleLimit, replicantLifeTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -55,12 +55,12 @@ public class DruidMasterRuleRunner implements DruidMasterHelper
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String tier : cluster.getTierNames()) {
|
for (String tier : cluster.getTierNames()) {
|
||||||
replicationManager.updateReplicationState(tier);
|
replicatorThrottler.updateReplicationState(tier);
|
||||||
replicationManager.updateTerminationState(tier);
|
replicatorThrottler.updateTerminationState(tier);
|
||||||
}
|
}
|
||||||
|
|
||||||
DruidMasterRuntimeParams paramsWithReplicationManager = params.buildFromExisting()
|
DruidMasterRuntimeParams paramsWithReplicationManager = params.buildFromExisting()
|
||||||
.withReplicationManager(replicationManager)
|
.withReplicationManager(replicatorThrottler)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Run through all matched rules for available segments
|
// Run through all matched rules for available segments
|
||||||
|
|
|
@ -76,6 +76,7 @@ public class ReplicationThrottler
|
||||||
} else {
|
} else {
|
||||||
log.info("[%s]: Replicant %s queue is empty.", tier, type);
|
log.info("[%s]: Replicant %s queue is empty.", tier, type);
|
||||||
lookup.put(tier, true);
|
lookup.put(tier, true);
|
||||||
|
holder.resetLifetime(tier);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,5 +163,10 @@ public class ReplicationThrottler
|
||||||
}
|
}
|
||||||
lifetimes.put(tier, --lifetime);
|
lifetimes.put(tier, --lifetime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void resetLifetime(String tier)
|
||||||
|
{
|
||||||
|
lifetimes.put(tier, maxLifetime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue