From badc7b2e3fde37e57f4009f2b17135ff1fb7968c Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 4 Feb 2014 15:21:45 -0800 Subject: [PATCH] fix bug with load throttling and more tests --- .../coordinator/SegmentReplicantLookup.java | 37 ++++++-- .../server/coordinator/rules/LoadRule.java | 71 ++++++++------- .../DruidCoordinatorRuleRunnerTest.java | 88 +++++++++++++++++++ .../coordinator/rules/LoadRuleTest.java | 2 +- 4 files changed, 160 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index a90eeba4cae..fddaf0d7c05 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -83,12 +83,22 @@ public class SegmentReplicantLookup } public Map getLoadingTiers(String segmentId) - { - Map retVal = loadingSegments.row(segmentId); - return (retVal == null) ? Maps.newHashMap() : retVal; - } + { + Map retVal = loadingSegments.row(segmentId); + return (retVal == null) ? Maps.newHashMap() : retVal; + } - public int getClusterReplicants(String segmentId, String tier) + public int getLoadedReplicants(String segmentId) + { + Map allTiers = segmentsInCluster.row(segmentId); + int retVal = 0; + for (Integer replicants : allTiers.values()) { + retVal += replicants; + } + return retVal; + } + + public int getLoadedReplicants(String segmentId, String tier) { Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; @@ -100,8 +110,23 @@ public class SegmentReplicantLookup return (retVal == null) ? 0 : retVal; } + public int getLoadingReplicants(String segmentId) + { + Map allTiers = loadingSegments.row(segmentId); + int retVal = 0; + for (Integer replicants : allTiers.values()) { + retVal += replicants; + } + return retVal; + } + + public int getTotalReplicants(String segmentId) + { + return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId); + } + public int getTotalReplicants(String segmentId, String tier) { - return getClusterReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); + return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index f629318395b..cac7a5b8c28 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -42,6 +42,8 @@ import java.util.Map; public abstract class LoadRule implements Rule { private static final EmittingLogger log = new EmittingLogger(LoadRule.class); + private static final String assignedCount = "assignedCount"; + private static final String droppedCount = "droppedCount"; @Override public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment) @@ -49,13 +51,17 @@ public abstract class LoadRule implements Rule CoordinatorStats stats = new CoordinatorStats(); final Map loadStatus = Maps.newHashMap(); + + int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier()); for (Map.Entry entry : getTieredReplicants().entrySet()) { final String tier = entry.getKey(); - final int expectedReplicants = entry.getValue(); + final int expectedReplicantsInTier = entry.getValue(); + final int totalReplicantsInTier = params.getSegmentReplicantLookup() + .getTotalReplicants(segment.getIdentifier(), tier); + final int loadedReplicantsInTier = params.getSegmentReplicantLookup() + .getLoadedReplicants(segment.getIdentifier(), tier); - int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier); - - MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier); + final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier); if (serverQueue == null) { log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); return stats; @@ -65,22 +71,21 @@ public abstract class LoadRule implements Rule final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); if (params.getAvailableSegments().contains(segment)) { - stats.accumulate( - assign( - params.getReplicationManager(), - tier, - expectedReplicants, - totalReplicants, - strategy, - serverHolderList, - segment - ) + CoordinatorStats assignStats = assign( + params.getReplicationManager(), + tier, + totalReplicantsInCluster, + expectedReplicantsInTier, + totalReplicantsInTier, + strategy, + serverHolderList, + segment ); + stats.accumulate(assignStats); + totalReplicantsInCluster += assignStats.getPerTierStats().get(assignedCount).get(tier).get(); } - int clusterReplicants = params.getSegmentReplicantLookup() - .getClusterReplicants(segment.getIdentifier(), tier); - loadStatus.put(tier, expectedReplicants - clusterReplicants); + loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier); } // Remove over-replication stats.accumulate(drop(loadStatus, segment, params)); @@ -92,18 +97,21 @@ public abstract class LoadRule implements Rule private CoordinatorStats assign( final ReplicationThrottler replicationManager, final String tier, - final int expectedReplicants, - int totalReplicants, + final int totalReplicantsInCluster, + final int expectedReplicantsInTier, + final int totalReplicantsInTier, final BalancerStrategy strategy, final List serverHolderList, final DataSegment segment ) { final CoordinatorStats stats = new CoordinatorStats(); - stats.addToTieredStat("assignedCount", tier, 0); + stats.addToTieredStat(assignedCount, tier, 0); - while (totalReplicants < expectedReplicants) { - boolean replicate = totalReplicants > 0; + int currReplicantsInTier = totalReplicantsInTier; + int currTotalReplicantsInCluster = totalReplicantsInCluster; + while (currReplicantsInTier < expectedReplicantsInTier) { + boolean replicate = currTotalReplicantsInCluster > 0; if (replicate && !replicationManager.canCreateReplicant(tier)) { break; @@ -116,7 +124,7 @@ public abstract class LoadRule implements Rule "Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", tier, segment.getIdentifier(), - expectedReplicants + expectedReplicantsInTier ); break; } @@ -143,8 +151,9 @@ public abstract class LoadRule implements Rule } ); - stats.addToTieredStat("assignedCount", tier, 1); - ++totalReplicants; + stats.addToTieredStat(assignedCount, tier, 1); + ++currReplicantsInTier; + ++currTotalReplicantsInCluster; } return stats; @@ -162,7 +171,7 @@ public abstract class LoadRule implements Rule return stats; } - // Make sure we have enough actual replicants in the correct tiers in the cluster before doing anything + // Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything for (Integer leftToLoad : loadStatus.values()) { if (leftToLoad > 0) { return stats; @@ -176,10 +185,10 @@ public abstract class LoadRule implements Rule for (Map.Entry entry : replicantsByTier.entrySet()) { final String tier = entry.getKey(); - int actualNumReplicantsForTier = entry.getValue(); + int loadedNumReplicantsForTier = entry.getValue(); int expectedNumReplicantsForTier = getNumReplicants(tier); - stats.addToTieredStat("droppedCount", tier, 0); + stats.addToTieredStat(droppedCount, tier, 0); MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier); if (serverQueue == null) { @@ -188,7 +197,7 @@ public abstract class LoadRule implements Rule } List droppedServers = Lists.newArrayList(); - while (actualNumReplicantsForTier > expectedNumReplicantsForTier) { + while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) { final ServerHolder holder = serverQueue.pollLast(); if (holder == null) { log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); @@ -224,8 +233,8 @@ public abstract class LoadRule implements Rule } } ); - --actualNumReplicantsForTier; - stats.addToTieredStat("droppedCount", tier, 1); + --loadedNumReplicantsForTier; + stats.addToTieredStat(droppedCount, tier, 1); } droppedServers.add(holder); } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 2663a4a94e4..95d52c1fad1 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -979,6 +979,94 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.verify(mockPeon); } + /** + * Nodes: + * hot - nothing loaded + * _default_tier - 1 segment loaded + * + * @throws Exception + */ + @Test + public void testReplicantThrottleAcrossTiers() throws Exception + { + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.replay(mockPeon); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Lists.newArrayList( + new IntervalLoadRule( + new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), + ImmutableMap.of( + "hot", 1, + DruidServer.DEFAULT_TIER, 1 + ), + null, + null + ) + ) + ).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DruidCluster druidCluster = new DruidCluster( + ImmutableMap.of( + "hot", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + 1000, + "historical", + "hot", + 0 + ), + mockPeon + ) + ) + ), + DruidServer.DEFAULT_TIER, + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + Arrays.asList( + new ServerHolder( + new DruidServer( + "serverNorm", + "hostNorm", + 1000, + "historical", + DruidServer.DEFAULT_TIER, + 0 + ), + mockPeon + ) + ) + ) + ) + ); + + DruidCoordinatorRuntimeParams params = + new DruidCoordinatorRuntimeParams.Builder() + .withDruidCluster(druidCluster) + .withAvailableSegments(availableSegments) + .withDatabaseRuleManager(databaseRuleManager) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .build(); + + DruidCoordinatorRuleRunner runner = new DruidCoordinatorRuleRunner(new ReplicationThrottler(7, 1), coordinator); + DruidCoordinatorRuntimeParams afterParams = runner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 24); + Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 7); + Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); + Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + + EasyMock.verify(mockPeon); + } + @Test public void testDropReplicantThrottle() throws Exception { diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index a52927d0b7d..3e75d1b79c3 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -59,7 +59,7 @@ public class LoadRuleTest public void setUp() throws Exception { mockPeon = EasyMock.createMock(LoadQueuePeon.class); - throttler = new ReplicationThrottler(1, 1); + throttler = new ReplicationThrottler(2, 1); for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) { throttler.updateReplicationState(tier); throttler.updateTerminationState(tier);