From a7553776fac60722f1230c17deffc569194c344d Mon Sep 17 00:00:00 2001 From: Nelson Ray Date: Mon, 7 Jan 2013 12:51:51 -0800 Subject: [PATCH] make sampling of segments uniformly at random and add unit test --- .../druid/master/BalancerCostAnalyzer.java | 37 +- .../druid/master/DruidMasterBalancerTest.java | 323 ++++++++++++++---- 2 files changed, 288 insertions(+), 72 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java index 94a55363bc8..3375e4b4c91 100644 --- a/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/master/BalancerCostAnalyzer.java @@ -56,6 +56,7 @@ public class BalancerCostAnalyzer private double initialTotalCost; private double normalization; private double totalCostChange; + private int totalSegments; public BalancerCostAnalyzer(DateTime referenceTimestamp) { @@ -64,11 +65,12 @@ public class BalancerCostAnalyzer totalCostChange = 0; } - public void init(List serverHolderList) + public void init(List serverHolderList, DruidMasterRuntimeParams params) { this.initialTotalCost = calculateInitialTotalCost(serverHolderList); this.normalization = calculateNormalization(serverHolderList); this.serverHolderList = serverHolderList; + this.totalSegments = params.getAvailableSegments().size(); } public double getInitialTotalCost() @@ -317,6 +319,23 @@ public class BalancerCostAnalyzer } + /* + * Sample from each server with probability proportional to the number of segments on that server. + */ + private ServerHolder sampleServer() + { + int num = rand.nextInt(totalSegments); + int cumulativeSegments = 0; + int numToStopAt = 0; + + while (cumulativeSegments <= num) { + cumulativeSegments += serverHolderList.get(numToStopAt).getServer().getSegments().size(); + numToStopAt++; + } + + return serverHolderList.get(numToStopAt - 1); + } + public Set findSegmentsToMove() { Set segmentHoldersToMove = Sets.newHashSet(); @@ -326,11 +345,19 @@ public class BalancerCostAnalyzer while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) { counter++; - ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size())); + + int numServers = serverHolderList.size(); + if (numServers == 0) break; + + // We want to sample from each server w.p. numSegmentsOnServer / totalSegments + ServerHolder fromServerHolder = sampleServer(); + + // and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer + // so that the probability of picking a segment is 1 / totalSegments. List segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values()); - if (segments.size() == 0) { - continue; - } + + if (segments.size() == 0) continue; + DataSegment proposalSegment = segments.get(rand.nextInt(segments.size())); if (movingSegments.contains(proposalSegment)) { continue; diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 777b4063a10..551bd424bb4 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -20,11 +20,14 @@ package com.metamx.druid.master; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; +import com.metamx.druid.shard.NoneShardSpec; import junit.framework.Assert; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -42,73 +45,139 @@ import java.util.Map; public class DruidMasterBalancerTest { private DruidMaster master; - private DruidServer druidServerHigh; - private DruidServer druidServerLow; + private DruidServer druidServer1; + private DruidServer druidServer2; + private DruidServer druidServer3; + private DruidServer druidServer4; private DataSegment segment1; private DataSegment segment2; private DataSegment segment3; private DataSegment segment4; + Map segments; private LoadQueuePeon peon; private DruidDataSource dataSource; - private Map segments; - private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24; @Before public void setUp() throws Exception { master = EasyMock.createMock(DruidMaster.class); - druidServerHigh = EasyMock.createMock(DruidServer.class); - druidServerLow = EasyMock.createMock(DruidServer.class); + druidServer1 = EasyMock.createMock(DruidServer.class); + druidServer2 = EasyMock.createMock(DruidServer.class); + druidServer3 = EasyMock.createMock(DruidServer.class); + druidServer4 = EasyMock.createMock(DruidServer.class); segment1 = EasyMock.createMock(DataSegment.class); segment2 = EasyMock.createMock(DataSegment.class); segment3 = EasyMock.createMock(DataSegment.class); segment4 = EasyMock.createMock(DataSegment.class); peon = EasyMock.createMock(LoadQueuePeon.class); dataSource = EasyMock.createMock(DruidDataSource.class); + + DateTime start1 = new DateTime("2012-01-01"); + DateTime start2 = new DateTime("2012-02-01"); + DateTime version = new DateTime("2012-03-01"); + segment1 = new DataSegment( + "datasource1", + new Interval(start1, start1.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 11L + ); + segment2 = new DataSegment( + "datasource1", + new Interval(start2, start2.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 7L + ); + segment3 = new DataSegment( + "datasource2", + new Interval(start1, start1.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 4L + ); + segment4 = new DataSegment( + "datasource2", + new Interval(start2, start2.plusHours(1)), + version.toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 8L + ); + + segments = new HashMap(); + segments.put("segment1", segment1); + segments.put("segment2", segment2); + segments.put("segment3", segment3); + segments.put("segment4", segment4); } @After public void tearDown() throws Exception { EasyMock.verify(master); - EasyMock.verify(druidServerHigh); - EasyMock.verify(druidServerLow); - EasyMock.verify(segment1); - EasyMock.verify(segment2); - EasyMock.verify(segment3); - EasyMock.verify(segment4); + EasyMock.verify(druidServer1); + EasyMock.verify(druidServer2); + EasyMock.verify(druidServer3); + EasyMock.verify(druidServer4); EasyMock.verify(peon); EasyMock.verify(dataSource); } @Test - public void testRun() + public void testRun1() { - Map segments = new HashMap(); - segments.put("segment1", segment1); - segments.put("segment2", segment2); - segments.put("segment3", segment3); - segments.put("segment4", segment4); - // Mock some servers of different usages - EasyMock.expect(druidServerHigh.getName()).andReturn("from").atLeastOnce(); - EasyMock.expect(druidServerHigh.getCurrSize()).andReturn(30L).atLeastOnce(); - EasyMock.expect(druidServerHigh.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServerHigh.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); - EasyMock.expect(druidServerHigh.getSegments()).andReturn(segments).anyTimes(); - EasyMock.replay(druidServerHigh); - EasyMock.expect(druidServerLow.getName()).andReturn("to").atLeastOnce(); - EasyMock.expect(druidServerLow.getTier()).andReturn("normal").atLeastOnce(); - EasyMock.expect(druidServerLow.getCurrSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(druidServerLow.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServerLow.getSegments()).andReturn(new HashMap()).anyTimes(); - EasyMock.expect(druidServerLow.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); - EasyMock.expect(druidServerLow.getSegment("segment1")).andReturn(null).anyTimes(); - EasyMock.expect(druidServerLow.getSegment("segment2")).andReturn(null).anyTimes(); - EasyMock.expect(druidServerLow.getSegment("segment3")).andReturn(null).anyTimes(); - EasyMock.expect(druidServerLow.getSegment("segment4")).andReturn(null).anyTimes(); - EasyMock.replay(druidServerLow); + EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce(); + EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer1.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); + EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); + EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer2.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect(druidServer2.getSegment("segment3")).andReturn(null).anyTimes(); + EasyMock.expect(druidServer2.getSegment("segment4")).andReturn(null).anyTimes(); + EasyMock.replay(druidServer2); + EasyMock.replay(druidServer3); + EasyMock.replay(druidServer4); // Mock a datasource EasyMock.expect(dataSource.getSegments()).andReturn( @@ -121,35 +190,6 @@ public class DruidMasterBalancerTest ).anyTimes(); EasyMock.replay(dataSource); - /* - (2, 1, 2, 1 - -, 2, 1, 2 - -, -, 2, 1 - -, -, -, 2) - */ - - // Mock some segments of different sizes - EasyMock.expect(segment1.getSize()).andReturn(11L).anyTimes(); - EasyMock.expect(segment1.getIdentifier()).andReturn("segment1").anyTimes(); - EasyMock.expect(segment1.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.expect(segment1.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes(); - EasyMock.replay(segment1); - EasyMock.expect(segment2.getSize()).andReturn(7L).anyTimes(); - EasyMock.expect(segment2.getIdentifier()).andReturn("segment2").anyTimes(); - EasyMock.expect(segment2.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.expect(segment2.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes(); - EasyMock.replay(segment2); - EasyMock.expect(segment3.getSize()).andReturn(4L).anyTimes(); - EasyMock.expect(segment3.getIdentifier()).andReturn("segment3").anyTimes(); - EasyMock.expect(segment3.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.expect(segment3.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes(); - EasyMock.replay(segment3); - EasyMock.expect(segment4.getSize()).andReturn(8L).anyTimes(); - EasyMock.expect(segment4.getIdentifier()).andReturn("segment4").anyTimes(); - EasyMock.expect(segment4.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.expect(segment4.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes(); - EasyMock.replay(segment4); - // Mock stuff that the master needs master.moveSegment( EasyMock.anyObject(), @@ -173,14 +213,163 @@ public class DruidMasterBalancerTest MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator) .create( Arrays.asList( - new ServerHolder(druidServerHigh, peon), - new ServerHolder(druidServerLow, peon) + new ServerHolder(druidServer1, peon), + new ServerHolder(druidServer2, peon) ) ) ) ) ) .withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) + .withAvailableSegments(segments.values()) + .build(); + + params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params); + Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); + Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0); + } + + @Test + public void testRun2() + { + // Mock some servers of different usages + EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce(); + EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer1.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); + EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce(); + EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer2.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer2.getSegment( + "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.replay(druidServer2); + + EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce(); + EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer3.getCurrSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer3.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer3.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); + EasyMock.expect( + druidServer3.getSegment( + "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer3.getSegment( + "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer3.getSegment( + "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer3.getSegment( + "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.replay(druidServer3); + + EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce(); + EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer4.getCurrSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer4.getSegments()).andReturn(new HashMap()).anyTimes(); + EasyMock.expect(druidServer4.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); + EasyMock.expect( + druidServer4.getSegment( + "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer4.getSegment( + "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer4.getSegment( + "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer4.getSegment( + "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.replay(druidServer4); + + + // Mock a datasource + EasyMock.expect(dataSource.getSegments()).andReturn( + Sets.newHashSet( + segment1, + segment2, + segment3, + segment4 + ) + ).anyTimes(); + EasyMock.replay(dataSource); + + // Mock stuff that the master needs + master.moveSegment( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.replay(master); + + EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); + EasyMock.expect(peon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); + EasyMock.replay(peon); + + DruidMasterRuntimeParams params = + DruidMasterRuntimeParams.newBuilder() + .withDruidCluster( + new DruidCluster( + ImmutableMap.>of( + "normal", + MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator) + .create( + Arrays.asList( + new ServerHolder(druidServer1, peon), + new ServerHolder(druidServer2, peon), + new ServerHolder(druidServer3, peon), + new ServerHolder(druidServer4, peon) + ) + ) + ) + ) + ) + .withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon)) + .withAvailableSegments(segments.values()) .build(); params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);