From 092fe70a35401a873d6bd9f2d4a09eaec333b649 Mon Sep 17 00:00:00 2001 From: Nelson Ray Date: Tue, 2 Apr 2013 11:36:01 -0700 Subject: [PATCH] use reservoir sampling in pickSegmentToMove to avoid IndexOutOfBoundsException when a segment gets dropped mid-run --- .../druid/master/BalancerCostAnalyzer.java | 51 +++++++------------ .../druid/master/DruidMasterBalancer.java | 3 +- .../druid/master/DruidMasterBalancerTest.java | 4 +- 3 files changed, 21 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index a479d33f2df..0d49c3a3bee 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -144,46 +144,31 @@ public class BalancerCostAnalyzer } /** - * Sample from each server with probability proportional to the number of segments on that server. + * The balancing application requires us to pick a proposal segment uniformly at random from the set of + * all servers. We use reservoir sampling to do this. * * @param serverHolders A list of ServerHolders for a particular tier. - * @param numSegments - * - * @return A ServerHolder sampled with probability proportional to the - * number of segments on that server - */ - private ServerHolder sampleServer(final List serverHolders, final int numSegments) - { - final int num = rand.nextInt(numSegments); - int cumulativeSegments = 0; - int numToStopAt = 0; - - while (cumulativeSegments <= num) { - cumulativeSegments += serverHolders.get(numToStopAt).getServer().getSegments().size(); - numToStopAt++; - } - - return serverHolders.get(numToStopAt - 1); - } - - /** - * The balancing application requires us to pick a proposal segment. - * - * @param serverHolders A list of ServerHolders for a particular tier. - * @param numSegments The total number of segments on a particular tier. * * @return A BalancerSegmentHolder sampled uniformly at random. */ - public BalancerSegmentHolder pickSegmentToMove(final List serverHolders, final int numSegments) + public BalancerSegmentHolder pickSegmentToMove(final List serverHolders) { - /** We want to sample from each server w.p. numSegmentsOnServer / totalSegments */ - ServerHolder fromServerHolder = sampleServer(serverHolders, numSegments); + ServerHolder fromServerHolder = null; + DataSegment proposalSegment = null; + int numSoFar = 0; - /** and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer - so that the probability of picking a segment is 1 / totalSegments. */ - List segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); + for (ServerHolder server : serverHolders) { + for (DataSegment segment : server.getServer().getSegments().values()) { + int randNum = rand.nextInt(numSoFar + 1); + // w.p. 1 / (numSoFar + 1), swap out the server and segment + if (randNum == numSoFar) { + fromServerHolder = server; + proposalSegment = segment; + numSoFar++; + } + } + } - DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); return new BalancerSegmentHolder(fromServerHolder.getServer(), proposalSegment); } @@ -214,7 +199,7 @@ public class BalancerCostAnalyzer } /** - * For assigment, we want to move to the lowest cost server that isn't already serving the segment. + * For assignment, we want to move to the lowest cost server that isn't already serving the segment. * * @param proposalSegment A DataSegment that we are proposing to move. * @param serverHolders An iterable of ServerHolders for a particular tier. diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index aa0025749c5..d2e548fd10f 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -28,7 +28,6 @@ import com.metamx.druid.client.DruidServer; import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -116,7 +115,7 @@ public class DruidMasterBalancer implements DruidMasterHelper int iter = 0; while (iter < maxSegmentsToMove) { iter++; - final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList, numSegments); + final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList); final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); if (holder == null) { continue; diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 2ff1dade115..74f344bc467 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -193,7 +193,7 @@ public class DruidMasterBalancerTest .build(); params = new DruidMasterBalancerTester(master).run(params); - Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() == 3); + Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); } @Test @@ -271,6 +271,6 @@ public class DruidMasterBalancerTest .build(); params = new DruidMasterBalancerTester(master).run(params); - Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() == 4); + Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); } }