From 74a977504ea4ed5b2290f8e26166ff871f7c7333 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Fri, 14 Dec 2012 17:06:03 -0800 Subject: [PATCH] bug fix for dropping segments in master --- .../query/timeboundary/TimeBoundaryQuery.java | 4 +- .../com/metamx/druid/master/ServerHolder.java | 7 +- .../metamx/druid/master/rules/LoadRule.java | 26 +-- .../master/DruidMasterRuleRunnerTest.java | 149 ++++++++++++++++-- 4 files changed, 162 insertions(+), 24 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java index 541d5342c73..9c7340595c5 100644 --- a/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java +++ b/client/src/main/java/com/metamx/druid/query/timeboundary/TimeBoundaryQuery.java @@ -43,8 +43,8 @@ import java.util.Map; public class TimeBoundaryQuery extends BaseQuery> { public static final Interval MY_Y2K_INTERVAL = new Interval( - new DateTime(Long.MIN_VALUE), - new DateTime(Long.MAX_VALUE) + new DateTime("0000-01-01"), + new DateTime("9000-01-01") ); public static final String MAX_TIME = "maxTime"; public static final String MIN_TIME = "minTime"; diff --git a/server/src/main/java/com/metamx/druid/master/ServerHolder.java b/server/src/main/java/com/metamx/druid/master/ServerHolder.java index 6cf4d65ce3f..82c31fd7337 100644 --- a/server/src/main/java/com/metamx/druid/master/ServerHolder.java +++ b/server/src/main/java/com/metamx/druid/master/ServerHolder.java @@ -94,9 +94,14 @@ public class ServerHolder implements Comparable return availableSize; } + public boolean isServingSegment(DataSegment segment) + { + return (server.getSegment(segment.getIdentifier()) != null); + } + public boolean containsSegment(DataSegment segment) { - return (server.getSegment(segment.getIdentifier()) != null || peon.getSegmentsToLoad().contains(segment)); + return isServingSegment(segment) || peon.getSegmentsToLoad().contains(segment); } @Override diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index b3d256d2066..f3b5de84bf9 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -163,23 +163,25 @@ public abstract class LoadRule implements Rule while (actualNumReplicantsForType > expectedNumReplicantsForType) { ServerHolder holder = serverQueue.pollLast(); if (holder == null) { - log.warn("Wtf, holder was null? Do I have no servers[%s]?", serverQueue); - continue; + log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); + break; } - holder.getPeon().dropSegment( - segment, - new LoadPeonCallback() - { - @Override - protected void execute() + if (holder.isServingSegment(segment)) { + holder.getPeon().dropSegment( + segment, + new LoadPeonCallback() { + @Override + protected void execute() + { + } } - } - ); + ); + --actualNumReplicantsForType; + stats.addToTieredStat("droppedCount", tier, 1); + } droppedServers.add(holder); - --actualNumReplicantsForType; - stats.addToTieredStat("droppedCount", tier, 1); } serverQueue.addAll(droppedServers); } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java index c31dc670b82..acde8a14707 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterRuleRunnerTest.java @@ -31,7 +31,6 @@ import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.master.rules.IntervalDropRule; import com.metamx.druid.master.rules.IntervalLoadRule; import com.metamx.druid.master.rules.Rule; -import com.metamx.druid.master.rules.RuleMap; import com.metamx.druid.shard.NoneShardSpec; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -86,14 +85,6 @@ public class DruidMasterRuleRunnerTest } ruleRunner = new DruidMasterRuleRunner(master); - - mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().anyTimes(); - mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.replay(mockPeon); } @After @@ -113,6 +104,12 @@ public class DruidMasterRuleRunnerTest @Test public void testRunThreeTiersOneReplicant() throws Exception { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"), @@ -202,6 +199,12 @@ public class DruidMasterRuleRunnerTest @Test public void testRunTwoTiersTwoReplicants() throws Exception { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"), @@ -284,6 +287,12 @@ public class DruidMasterRuleRunnerTest @Test public void testRunTwoTiersWithExistingSegments() throws Exception { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"), @@ -356,6 +365,12 @@ public class DruidMasterRuleRunnerTest @Test public void testRunTwoTiersTierDoesNotExist() throws Exception { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + emitter.emit(EasyMock.anyObject()); EasyMock.expectLastCall().times(12); EasyMock.replay(emitter); @@ -455,6 +470,12 @@ public class DruidMasterRuleRunnerTest @Test public void testDropRemove() throws Exception { + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + master.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); EasyMock.replay(master); @@ -513,6 +534,12 @@ public class DruidMasterRuleRunnerTest @Test public void testDropTooManyInSameTier() throws Exception { + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"), @@ -581,6 +608,14 @@ public class DruidMasterRuleRunnerTest @Test public void testDropTooManyInDifferentTiers() throws Exception { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"), @@ -653,6 +688,14 @@ public class DruidMasterRuleRunnerTest @Test public void testDontDropInDifferentTiers() throws Exception { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( Lists.newArrayList( new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"), @@ -717,4 +760,92 @@ public class DruidMasterRuleRunnerTest Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); } + + @Test + public void testDropServerActuallyServesSegment() throws Exception + { + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Lists.newArrayList( + new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), 0, "normal") + ) + ).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DruidServer server1 = new DruidServer( + "server1", + "host1", + 1000, + "historical", + "normal" + ); + server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0)); + DruidServer server2 = new DruidServer( + "serverNorm2", + "hostNorm2", + 1000, + "historical", + "normal" + ); + server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1)); + DruidServer server3 = new DruidServer( + "serverNorm3", + "hostNorm3", + 1000, + "historical", + "normal" + ); + server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1)); + server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2)); + + mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + + LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); + EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce(); + EasyMock.replay(anotherMockPeon); + + DruidCluster druidCluster = new DruidCluster( + ImmutableMap.of( + "normal", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + server1, + mockPeon + ), + new ServerHolder( + server2, + anotherMockPeon + ), + new ServerHolder( + server3, + anotherMockPeon + ) + ) + ) + ) + ); + + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + + DruidMasterRuntimeParams params = new DruidMasterRuntimeParams.Builder() + .withDruidCluster(druidCluster) + .withMillisToWaitBeforeDeleting(0L) + .withAvailableSegments(availableSegments) + .withDatabaseRuleManager(databaseRuleManager) + .withSegmentReplicantLookup(segmentReplicantLookup) + .build(); + + DruidMasterRuntimeParams afterParams = ruleRunner.run(params); + MasterStats stats = afterParams.getMasterStats(); + + Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); + + EasyMock.verify(mockPeon); + EasyMock.verify(anotherMockPeon); + } }