From fb1d6c177a6cad6e60b4501d19d40c5de00575c7 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 26 Sep 2014 15:45:20 -0700 Subject: [PATCH 1/2] fix dynamic config bug --- .../coordinator/ReplicationThrottler.java | 10 ++++- .../helper/DruidCoordinatorRuleRunner.java | 5 +++ .../DruidCoordinatorRuleRunnerTest.java | 38 ++++++++++++++++--- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/ReplicationThrottler.java b/server/src/main/java/io/druid/server/coordinator/ReplicationThrottler.java index 2559bc5ce51..9b8cfeb6247 100644 --- a/server/src/main/java/io/druid/server/coordinator/ReplicationThrottler.java +++ b/server/src/main/java/io/druid/server/coordinator/ReplicationThrottler.java @@ -33,15 +33,21 @@ import java.util.concurrent.ConcurrentHashMap; public class ReplicationThrottler { private static final EmittingLogger log = new EmittingLogger(ReplicationThrottler.class); - private final int maxReplicants; - private final int maxLifetime; private final Map replicatingLookup = Maps.newHashMap(); private final Map terminatingLookup = Maps.newHashMap(); private final ReplicatorSegmentHolder currentlyReplicating = new ReplicatorSegmentHolder(); private final ReplicatorSegmentHolder currentlyTerminating = new ReplicatorSegmentHolder(); + private volatile int maxReplicants; + private volatile int maxLifetime; + public ReplicationThrottler(int maxReplicants, int maxLifetime) + { + updateParams(maxReplicants, maxLifetime); + } + + public void updateParams(int maxReplicants, int maxLifetime) { this.maxReplicants = maxReplicants; this.maxLifetime = maxLifetime; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index e0cb457418b..899766b7e6f 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -62,6 +62,11 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + replicatorThrottler.updateParams( + coordinator.getDynamicConfigs().getReplicationThrottleLimit(), + coordinator.getDynamicConfigs().getReplicantLifetime() + ); + CoordinatorStats stats = new CoordinatorStats(); DruidCluster cluster = params.getDruidCluster(); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 4fafd3f2bd6..90a1b3da786 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -93,6 +93,7 @@ public class DruidCoordinatorRuleRunnerTest @After public void tearDown() throws Exception { + //EasyMock.verify(coordinator); EasyMock.verify(databaseRuleManager); } @@ -107,6 +108,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testRunThreeTiersOneReplicant() throws Exception { + mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -206,6 +208,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testRunTwoTiersTwoReplicants() throws Exception { + mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -298,6 +301,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testRunTwoTiersWithExistingSegments() throws Exception { + mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -379,6 +383,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testRunTwoTiersTierDoesNotExist() throws Exception { + mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -437,6 +442,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testRunRuleDoesNotExist() throws Exception { + mockCoordinator(); emitter.emit(EasyMock.anyObject()); EasyMock.expectLastCall().times(availableSegments.size()); EasyMock.replay(emitter); @@ -486,16 +492,12 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testDropRemove() throws Exception { + mockCoordinator(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); EasyMock.replay(mockPeon); - - coordinator.removeSegment(EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(coordinator); - EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"), @@ -552,6 +554,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testDropTooManyInSameTier() throws Exception { + mockCoordinator(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -629,6 +632,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testDropTooManyInDifferentTiers() throws Exception { + mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); @@ -712,6 +716,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testDontDropInDifferentTiers() throws Exception { + mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -791,6 +796,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testDropServerActuallyServesSegment() throws Exception { + mockCoordinator(); EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), null, 0, "normal") @@ -889,6 +895,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testReplicantThrottle() throws Exception { + mockCoordinator(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -989,6 +996,14 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testReplicantThrottleAcrossTiers() throws Exception { + EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( + new CoordinatorDynamicConfig( + 0, 0, 0, 0, 1, 7, 0, false + ) + ).atLeastOnce(); + coordinator.removeSegment(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(coordinator); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -1070,6 +1085,7 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testDropReplicantThrottle() throws Exception { + mockCoordinator(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); @@ -1155,4 +1171,16 @@ public class DruidCoordinatorRuleRunnerTest Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24); EasyMock.verify(mockPeon); } + + private void mockCoordinator() + { + EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( + new CoordinatorDynamicConfig( + 0, 0, 0, 0, 1, 24, 0, false + ) + ).anyTimes(); + coordinator.removeSegment(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(coordinator); + } } From 1a67ebe861c197a0d84578f39655c6c2c651a2cb Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Sep 2014 15:22:26 -0700 Subject: [PATCH 2/2] fix regression in test --- .../coordinator/DruidCoordinatorRuleRunnerTest.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 90a1b3da786..2aae37364ca 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -93,7 +93,7 @@ public class DruidCoordinatorRuleRunnerTest @After public void tearDown() throws Exception { - //EasyMock.verify(coordinator); + EasyMock.verify(coordinator); EasyMock.verify(databaseRuleManager); } @@ -492,12 +492,21 @@ public class DruidCoordinatorRuleRunnerTest @Test public void testDropRemove() throws Exception { - mockCoordinator(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); EasyMock.replay(mockPeon); + + EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( + new CoordinatorDynamicConfig( + 0, 0, 0, 0, 1, 24, 0, false + ) + ).anyTimes(); + coordinator.removeSegment(EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.replay(coordinator); + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),