make sampling of segments uniformly at random and add unit test

This commit is contained in:
Nelson Ray 2013-01-07 12:51:51 -08:00
parent 2ea164acd9
commit a7553776fa
2 changed files with 288 additions and 72 deletions

View File

@ -56,6 +56,7 @@ public class BalancerCostAnalyzer
private double initialTotalCost;
private double normalization;
private double totalCostChange;
private int totalSegments;
public BalancerCostAnalyzer(DateTime referenceTimestamp)
{
@ -64,11 +65,12 @@ public class BalancerCostAnalyzer
totalCostChange = 0;
}
public void init(List<ServerHolder> serverHolderList)
public void init(List<ServerHolder> serverHolderList, DruidMasterRuntimeParams params)
{
this.initialTotalCost = calculateInitialTotalCost(serverHolderList);
this.normalization = calculateNormalization(serverHolderList);
this.serverHolderList = serverHolderList;
this.totalSegments = params.getAvailableSegments().size();
}
public double getInitialTotalCost()
@ -317,6 +319,23 @@ public class BalancerCostAnalyzer
}
/*
* Sample from each server with probability proportional to the number of segments on that server.
*/
private ServerHolder sampleServer()
{
int num = rand.nextInt(totalSegments);
int cumulativeSegments = 0;
int numToStopAt = 0;
while (cumulativeSegments <= num) {
cumulativeSegments += serverHolderList.get(numToStopAt).getServer().getSegments().size();
numToStopAt++;
}
return serverHolderList.get(numToStopAt - 1);
}
public Set<BalancerSegmentHolder> findSegmentsToMove()
{
Set<BalancerSegmentHolder> segmentHoldersToMove = Sets.newHashSet();
@ -326,11 +345,19 @@ public class BalancerCostAnalyzer
while (segmentHoldersToMove.size() < MAX_SEGMENTS_TO_MOVE && counter < 3 * MAX_SEGMENTS_TO_MOVE) {
counter++;
ServerHolder fromServerHolder = serverHolderList.get(rand.nextInt(serverHolderList.size()));
int numServers = serverHolderList.size();
if (numServers == 0) break;
// We want to sample from each server w.p. numSegmentsOnServer / totalSegments
ServerHolder fromServerHolder = sampleServer();
// and actually pick that segment uniformly at random w.p. 1 / numSegmentsOnServer
// so that the probability of picking a segment is 1 / totalSegments.
List<DataSegment> segments = Lists.newArrayList(fromServerHolder.getServer().getSegments().values());
if (segments.size() == 0) {
continue;
}
if (segments.size() == 0) continue;
DataSegment proposalSegment = segments.get(rand.nextInt(segments.size()));
if (movingSegments.contains(proposalSegment)) {
continue;

View File

@ -20,11 +20,14 @@
package com.metamx.druid.master;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
@ -42,73 +45,139 @@ import java.util.Map;
public class DruidMasterBalancerTest
{
private DruidMaster master;
private DruidServer druidServerHigh;
private DruidServer druidServerLow;
private DruidServer druidServer1;
private DruidServer druidServer2;
private DruidServer druidServer3;
private DruidServer druidServer4;
private DataSegment segment1;
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
Map<String, DataSegment> segments;
private LoadQueuePeon peon;
private DruidDataSource dataSource;
private Map<String, DataSegment> segments;
private static final int DAY_IN_MILLIS = 1000 * 60 * 60 * 24;
@Before
public void setUp() throws Exception
{
master = EasyMock.createMock(DruidMaster.class);
druidServerHigh = EasyMock.createMock(DruidServer.class);
druidServerLow = EasyMock.createMock(DruidServer.class);
druidServer1 = EasyMock.createMock(DruidServer.class);
druidServer2 = EasyMock.createMock(DruidServer.class);
druidServer3 = EasyMock.createMock(DruidServer.class);
druidServer4 = EasyMock.createMock(DruidServer.class);
segment1 = EasyMock.createMock(DataSegment.class);
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class);
peon = EasyMock.createMock(LoadQueuePeon.class);
dataSource = EasyMock.createMock(DruidDataSource.class);
DateTime start1 = new DateTime("2012-01-01");
DateTime start2 = new DateTime("2012-02-01");
DateTime version = new DateTime("2012-03-01");
segment1 = new DataSegment(
"datasource1",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
11L
);
segment2 = new DataSegment(
"datasource1",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
7L
);
segment3 = new DataSegment(
"datasource2",
new Interval(start1, start1.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
4L
);
segment4 = new DataSegment(
"datasource2",
new Interval(start2, start2.plusHours(1)),
version.toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
8L
);
segments = new HashMap<String, DataSegment>();
segments.put("segment1", segment1);
segments.put("segment2", segment2);
segments.put("segment3", segment3);
segments.put("segment4", segment4);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(master);
EasyMock.verify(druidServerHigh);
EasyMock.verify(druidServerLow);
EasyMock.verify(segment1);
EasyMock.verify(segment2);
EasyMock.verify(segment3);
EasyMock.verify(segment4);
EasyMock.verify(druidServer1);
EasyMock.verify(druidServer2);
EasyMock.verify(druidServer3);
EasyMock.verify(druidServer4);
EasyMock.verify(peon);
EasyMock.verify(dataSource);
}
@Test
public void testRun()
public void testRun1()
{
Map<String, DataSegment> segments = new HashMap<String, DataSegment>();
segments.put("segment1", segment1);
segments.put("segment2", segment2);
segments.put("segment3", segment3);
segments.put("segment4", segment4);
// Mock some servers of different usages
EasyMock.expect(druidServerHigh.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServerHigh.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServerHigh.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServerHigh.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(druidServerHigh.getSegments()).andReturn(segments).anyTimes();
EasyMock.replay(druidServerHigh);
EasyMock.expect(druidServerLow.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServerLow.getTier()).andReturn("normal").atLeastOnce();
EasyMock.expect(druidServerLow.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServerLow.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServerLow.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServerLow.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(druidServerLow.getSegment("segment1")).andReturn(null).anyTimes();
EasyMock.expect(druidServerLow.getSegment("segment2")).andReturn(null).anyTimes();
EasyMock.expect(druidServerLow.getSegment("segment3")).andReturn(null).anyTimes();
EasyMock.expect(druidServerLow.getSegment("segment4")).andReturn(null).anyTimes();
EasyMock.replay(druidServerLow);
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(druidServer2.getSegment("segment3")).andReturn(null).anyTimes();
EasyMock.expect(druidServer2.getSegment("segment4")).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock a datasource
EasyMock.expect(dataSource.getSegments()).andReturn(
@ -121,35 +190,6 @@ public class DruidMasterBalancerTest
).anyTimes();
EasyMock.replay(dataSource);
/*
(2, 1, 2, 1
-, 2, 1, 2
-, -, 2, 1
-, -, -, 2)
*/
// Mock some segments of different sizes
EasyMock.expect(segment1.getSize()).andReturn(11L).anyTimes();
EasyMock.expect(segment1.getIdentifier()).andReturn("segment1").anyTimes();
EasyMock.expect(segment1.getDataSource()).andReturn("datasource1").anyTimes();
EasyMock.expect(segment1.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes();
EasyMock.replay(segment1);
EasyMock.expect(segment2.getSize()).andReturn(7L).anyTimes();
EasyMock.expect(segment2.getIdentifier()).andReturn("segment2").anyTimes();
EasyMock.expect(segment2.getDataSource()).andReturn("datasource1").anyTimes();
EasyMock.expect(segment2.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes();
EasyMock.replay(segment2);
EasyMock.expect(segment3.getSize()).andReturn(4L).anyTimes();
EasyMock.expect(segment3.getIdentifier()).andReturn("segment3").anyTimes();
EasyMock.expect(segment3.getDataSource()).andReturn("datasource1").anyTimes();
EasyMock.expect(segment3.getInterval()).andReturn(new Interval(0, 1 * DAY_IN_MILLIS)).anyTimes();
EasyMock.replay(segment3);
EasyMock.expect(segment4.getSize()).andReturn(8L).anyTimes();
EasyMock.expect(segment4.getIdentifier()).andReturn("segment4").anyTimes();
EasyMock.expect(segment4.getDataSource()).andReturn("datasource1").anyTimes();
EasyMock.expect(segment4.getInterval()).andReturn(new Interval(10 * DAY_IN_MILLIS, 11 * DAY_IN_MILLIS)).anyTimes();
EasyMock.replay(segment4);
// Mock stuff that the master needs
master.moveSegment(
EasyMock.<String>anyObject(),
@ -173,14 +213,163 @@ public class DruidMasterBalancerTest
MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServerHigh, peon),
new ServerHolder(druidServerLow, peon)
new ServerHolder(druidServer1, peon),
new ServerHolder(druidServer2, peon)
)
)
)
)
)
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon))
.withAvailableSegments(segments.values())
.build();
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("costChange").get("normal").get() > 0);
}
@Test
public void testRun2()
{
// Mock some servers of different usages
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer3.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer3.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(
druidServer3.getSegment(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer3.getSegment(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer3.getSegment(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer3.getSegment(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer4.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer4.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(
druidServer4.getSegment(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer4.getSegment(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer4.getSegment(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer4.getSegment(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.replay(druidServer4);
// Mock a datasource
EasyMock.expect(dataSource.getSegments()).andReturn(
Sets.<DataSegment>newHashSet(
segment1,
segment2,
segment3,
segment4
)
).anyTimes();
EasyMock.replay(dataSource);
// Mock stuff that the master needs
master.moveSegment(
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject()
);
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(master);
EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(peon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.replay(peon);
DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator)
.create(
Arrays.asList(
new ServerHolder(druidServer1, peon),
new ServerHolder(druidServer2, peon),
new ServerHolder(druidServer3, peon),
new ServerHolder(druidServer4, peon)
)
)
)
)
)
.withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon))
.withAvailableSegments(segments.values())
.build();
params = new DruidMasterBalancer(master, new BalancerCostAnalyzer(new DateTime("2013-01-01"))).run(params);