mirror of https://github.com/apache/druid.git
fix dynamic config bug
This commit is contained in:
parent
7cfe86b7bc
commit
fb1d6c177a
|
@ -33,15 +33,21 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
public class ReplicationThrottler
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);
|
||||
private final int maxReplicants;
|
||||
private final int maxLifetime;
|
||||
|
||||
private final Map<String, Boolean> replicatingLookup = Maps.newHashMap();
|
||||
private final Map<String, Boolean> terminatingLookup = Maps.newHashMap();
|
||||
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
|
||||
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();
|
||||
|
||||
private volatile int maxReplicants;
|
||||
private volatile int maxLifetime;
|
||||
|
||||
public ReplicationThrottler(int maxReplicants, int maxLifetime)
|
||||
{
|
||||
updateParams(maxReplicants, maxLifetime);
|
||||
}
|
||||
|
||||
public void updateParams(int maxReplicants, int maxLifetime)
|
||||
{
|
||||
this.maxReplicants = maxReplicants;
|
||||
this.maxLifetime = maxLifetime;
|
||||
|
|
|
@ -62,6 +62,11 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
|
|||
@Override
|
||||
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||
{
|
||||
replicatorThrottler.updateParams(
|
||||
coordinator.getDynamicConfigs().getReplicationThrottleLimit(),
|
||||
coordinator.getDynamicConfigs().getReplicantLifetime()
|
||||
);
|
||||
|
||||
CoordinatorStats stats = new CoordinatorStats();
|
||||
DruidCluster cluster = params.getDruidCluster();
|
||||
|
||||
|
|
|
@ -93,6 +93,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
//EasyMock.verify(coordinator);
|
||||
EasyMock.verify(databaseRuleManager);
|
||||
}
|
||||
|
||||
|
@ -107,6 +108,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testRunThreeTiersOneReplicant() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -206,6 +208,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testRunTwoTiersTwoReplicants() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -298,6 +301,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testRunTwoTiersWithExistingSegments() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -379,6 +383,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testRunTwoTiersTierDoesNotExist() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -437,6 +442,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testRunRuleDoesNotExist() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
|
||||
EasyMock.expectLastCall().times(availableSegments.size());
|
||||
EasyMock.replay(emitter);
|
||||
|
@ -486,16 +492,12 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testDropRemove() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
|
||||
EasyMock.replay(mockPeon);
|
||||
|
||||
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.replay(coordinator);
|
||||
|
||||
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
|
||||
Lists.<Rule>newArrayList(
|
||||
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),
|
||||
|
@ -552,6 +554,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testDropTooManyInSameTier() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -629,6 +632,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testDropTooManyInDifferentTiers() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
|
@ -712,6 +716,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testDontDropInDifferentTiers() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -791,6 +796,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testDropServerActuallyServesSegment() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
|
||||
Lists.<Rule>newArrayList(
|
||||
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), null, 0, "normal")
|
||||
|
@ -889,6 +895,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testReplicantThrottle() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -989,6 +996,14 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testReplicantThrottleAcrossTiers() throws Exception
|
||||
{
|
||||
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
|
||||
new CoordinatorDynamicConfig(
|
||||
0, 0, 0, 0, 1, 7, 0, false
|
||||
)
|
||||
).atLeastOnce();
|
||||
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
EasyMock.replay(coordinator);
|
||||
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -1070,6 +1085,7 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
@Test
|
||||
public void testDropReplicantThrottle() throws Exception
|
||||
{
|
||||
mockCoordinator();
|
||||
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
|
||||
|
@ -1155,4 +1171,16 @@ public class DruidCoordinatorRuleRunnerTest
|
|||
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
|
||||
EasyMock.verify(mockPeon);
|
||||
}
|
||||
|
||||
private void mockCoordinator()
|
||||
{
|
||||
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
|
||||
new CoordinatorDynamicConfig(
|
||||
0, 0, 0, 0, 1, 24, 0, false
|
||||
)
|
||||
).anyTimes();
|
||||
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
EasyMock.replay(coordinator);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue