Merge pull request #759 from metamx/fix-dynamic-config

fix dynamic config bug
This commit is contained in:
Gian Merlino 2014-09-29 15:27:35 -07:00
commit 5d964d2db6
3 changed files with 50 additions and 2 deletions

View File

@ -33,15 +33,21 @@ import java.util.concurrent.ConcurrentHashMap;
public class ReplicationThrottler
{
private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class);
private final int maxReplicants;
private final int maxLifetime;
private final Map<String, Boolean> replicatingLookup = Maps.newHashMap();
private final Map<String, Boolean> terminatingLookup = Maps.newHashMap();
private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder();
private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder();
private volatile int maxReplicants;
private volatile int maxLifetime;
public ReplicationThrottler(int maxReplicants, int maxLifetime)
{
updateParams(maxReplicants, maxLifetime);
}
public void updateParams(int maxReplicants, int maxLifetime)
{
this.maxReplicants = maxReplicants;
this.maxLifetime = maxLifetime;

View File

@ -62,6 +62,11 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
replicatorThrottler.updateParams(
coordinator.getDynamicConfigs().getReplicationThrottleLimit(),
coordinator.getDynamicConfigs().getReplicantLifetime()
);
CoordinatorStats stats = new CoordinatorStats();
DruidCluster cluster = params.getDruidCluster();

View File

@ -93,6 +93,7 @@ public class DruidCoordinatorRuleRunnerTest
@After
public void tearDown() throws Exception
{
EasyMock.verify(coordinator);
EasyMock.verify(databaseRuleManager);
}
@ -107,6 +108,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunThreeTiersOneReplicant() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -206,6 +208,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunTwoTiersTwoReplicants() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -298,6 +301,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunTwoTiersWithExistingSegments() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -379,6 +383,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunTwoTiersTierDoesNotExist() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -437,6 +442,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testRunRuleDoesNotExist() throws Exception
{
mockCoordinator();
emitter.emit(EasyMock.<ServiceEventBuilder>anyObject());
EasyMock.expectLastCall().times(availableSegments.size());
EasyMock.replay(emitter);
@ -492,6 +498,11 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false
)
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(coordinator);
@ -552,6 +563,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropTooManyInSameTier() throws Exception
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -629,6 +641,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropTooManyInDifferentTiers() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
@ -712,6 +725,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDontDropInDifferentTiers() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -791,6 +805,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropServerActuallyServesSegment() throws Exception
{
mockCoordinator();
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), null, 0, "normal")
@ -889,6 +904,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testReplicantThrottle() throws Exception
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -989,6 +1005,14 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testReplicantThrottleAcrossTiers() throws Exception
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 7, 0, false
)
).atLeastOnce();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -1070,6 +1094,7 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testDropReplicantThrottle() throws Exception
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
@ -1155,4 +1180,16 @@ public class DruidCoordinatorRuleRunnerTest
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24);
EasyMock.verify(mockPeon);
}
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(
new CoordinatorDynamicConfig(
0, 0, 0, 0, 1, 24, 0, false
)
).anyTimes();
coordinator.removeSegment(EasyMock.<DataSegment>anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
}
}