make balancer tests take into account loadqueue status

This commit is contained in:
Nelson Ray 2013-01-29 17:13:01 -08:00
parent 6438401a32
commit 034d652569
4 changed files with 147 additions and 55 deletions

View File

@ -50,11 +50,11 @@ public class DruidMasterBalancer implements DruidMasterHelper
} }
} }
); );
private static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class); protected static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class);
private final DruidMaster master; protected final DruidMaster master;
private final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments = Maps.newHashMap(); protected final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments = Maps.newHashMap();
public DruidMasterBalancer( public DruidMasterBalancer(
DruidMaster master DruidMaster master
@ -63,7 +63,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
this.master = master; this.master = master;
} }
private void reduceLifetimes(String tier) protected void reduceLifetimes(String tier)
{ {
for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) { for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) {
holder.reduceLifetime(); holder.reduceLifetime();
@ -159,7 +159,7 @@ public class DruidMasterBalancer implements DruidMasterHelper
.build(); .build();
} }
private void moveSegment( protected void moveSegment(
final BalancerSegmentHolder segment, final BalancerSegmentHolder segment,
final DruidServer toServer, final DruidServer toServer,
final DruidMasterRuntimeParams params final DruidMasterRuntimeParams params

View File

@ -28,6 +28,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.phonebook.PhoneBook;
import junit.framework.Assert; import junit.framework.Assert;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -39,6 +40,7 @@ import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
@ -55,7 +57,6 @@ public class DruidMasterBalancerTest
private DataSegment segment3; private DataSegment segment3;
private DataSegment segment4; private DataSegment segment4;
Map<String, DataSegment> segments; Map<String, DataSegment> segments;
private LoadQueuePeon peon;
private DruidDataSource dataSource; private DruidDataSource dataSource;
@Before @Before
@ -70,7 +71,6 @@ public class DruidMasterBalancerTest
segment2 = EasyMock.createMock(DataSegment.class); segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class); segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class); segment4 = EasyMock.createMock(DataSegment.class);
peon = EasyMock.createMock(LoadQueuePeon.class);
dataSource = EasyMock.createMock(DruidDataSource.class); dataSource = EasyMock.createMock(DruidDataSource.class);
DateTime start1 = new DateTime("2012-01-01"); DateTime start1 = new DateTime("2012-01-01");
@ -118,10 +118,10 @@ public class DruidMasterBalancerTest
); );
segments = new HashMap<String, DataSegment>(); segments = new HashMap<String, DataSegment>();
segments.put("segment1", segment1); segments.put("datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment1);
segments.put("segment2", segment2); segments.put("datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment2);
segments.put("segment3", segment3); segments.put("datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment3);
segments.put("segment4", segment4); segments.put("datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment4);
} }
@After @After
@ -132,7 +132,6 @@ public class DruidMasterBalancerTest
EasyMock.verify(druidServer2); EasyMock.verify(druidServer2);
EasyMock.verify(druidServer3); EasyMock.verify(druidServer3);
EasyMock.verify(druidServer4); EasyMock.verify(druidServer4);
EasyMock.verify(peon);
EasyMock.verify(dataSource); EasyMock.verify(dataSource);
} }
@ -146,6 +145,26 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); EasyMock.expect(druidServer1.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(
druidServer1.getSegment(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer1.getSegment(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer1.getSegment(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer1.getSegment(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.replay(druidServer1); EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce();
@ -154,29 +173,8 @@ public class DruidMasterBalancerTest
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes(); EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap<String, DataSegment>()).anyTimes();
EasyMock.expect(druidServer2.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); EasyMock.expect(druidServer2.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(
druidServer2.getSegment(
"datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z"
)
).andReturn(null).anyTimes();
EasyMock.expect(druidServer2.getSegment("segment3")).andReturn(null).anyTimes();
EasyMock.expect(druidServer2.getSegment("segment4")).andReturn(null).anyTimes();
EasyMock.replay(druidServer2); EasyMock.replay(druidServer2);
EasyMock.replay(druidServer3); EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4); EasyMock.replay(druidServer4);
@ -198,12 +196,11 @@ public class DruidMasterBalancerTest
EasyMock.<String>anyObject(), EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject() EasyMock.<LoadPeonCallback>anyObject()
); );
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master); EasyMock.replay(master);
EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "from", EasyMock.createMock(ScheduledExecutorService.class));
EasyMock.expect(peon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce(); LoadQueuePeonTester toPeon = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "to", EasyMock.createMock(ScheduledExecutorService.class));
EasyMock.replay(peon);
DruidMasterRuntimeParams params = DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder() DruidMasterRuntimeParams.newBuilder()
@ -211,23 +208,23 @@ public class DruidMasterBalancerTest
new DruidCluster( new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of( ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal", "normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator) MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create( .create(
Arrays.asList( Arrays.asList(
new ServerHolder(druidServer1, peon), new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, peon) new ServerHolder(druidServer2, toPeon)
) )
) )
) )
) )
) )
.withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) .withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("from", fromPeon, "to", toPeon))
.withAvailableSegments(segments.values()) .withAvailableSegments(segments.values())
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build(); .build();
params = new DruidMasterBalancer(master).run(params); params = new DruidMasterBalancerTester(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
} }
@ -345,12 +342,13 @@ public class DruidMasterBalancerTest
EasyMock.<String>anyObject(), EasyMock.<String>anyObject(),
EasyMock.<LoadPeonCallback>anyObject() EasyMock.<LoadPeonCallback>anyObject()
); );
EasyMock.expectLastCall().atLeastOnce(); EasyMock.expectLastCall().anyTimes();
EasyMock.replay(master); EasyMock.replay(master);
EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); LoadQueuePeonTester peon1 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "1", EasyMock.createMock(ScheduledExecutorService.class));
EasyMock.expect(peon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce(); LoadQueuePeonTester peon2 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "2", EasyMock.createMock(ScheduledExecutorService.class));
EasyMock.replay(peon); LoadQueuePeonTester peon3 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "3", EasyMock.createMock(ScheduledExecutorService.class));
LoadQueuePeonTester peon4 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "4", EasyMock.createMock(ScheduledExecutorService.class));
DruidMasterRuntimeParams params = DruidMasterRuntimeParams params =
DruidMasterRuntimeParams.newBuilder() DruidMasterRuntimeParams.newBuilder()
@ -358,25 +356,25 @@ public class DruidMasterBalancerTest
new DruidCluster( new DruidCluster(
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of( ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal", "normal",
MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator) MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator)
.create( .create(
Arrays.asList( Arrays.asList(
new ServerHolder(druidServer1, peon), new ServerHolder(druidServer1, peon1),
new ServerHolder(druidServer2, peon), new ServerHolder(druidServer2, peon2),
new ServerHolder(druidServer3, peon), new ServerHolder(druidServer3, peon3),
new ServerHolder(druidServer4, peon) new ServerHolder(druidServer4, peon4)
) )
) )
) )
) )
) )
.withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon)) .withLoadManagementPeons(ImmutableMap.<String, LoadQueuePeon>of("1", peon1, "2", peon2, "3", peon3, "4", peon4))
.withAvailableSegments(segments.values()) .withAvailableSegments(segments.values())
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withBalancerReferenceTimestamp(new DateTime("2013-01-01"))
.build(); .build();
params = new DruidMasterBalancer(master).run(params); params = new DruidMasterBalancerTester(master).run(params);
Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
} }
} }

View File

@ -0,0 +1,57 @@
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidServer;
public class DruidMasterBalancerTester extends DruidMasterBalancer
{
public DruidMasterBalancerTester(DruidMaster master)
{
super(master);
}
@Override
protected void moveSegment(
final BalancerSegmentHolder segment,
final DruidServer toServer,
final DruidMasterRuntimeParams params
)
{
final String toServerName = toServer.getName();
final LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName);
final String fromServerName = segment.getFromServer().getName();
final DataSegment segmentToMove = segment.getSegment();
final String segmentName = segmentToMove.getIdentifier();
if (!toPeon.getSegmentsToLoad().contains(segmentToMove) &&
!currentlyMovingSegments.get("normal").containsKey(segmentName) &&
!toServer.getSegments().containsKey(segmentName) &&
new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
log.info(
"Moving [%s] from [%s] to [%s]",
segmentName,
fromServerName,
toServerName
);
try {
final LoadQueuePeon loadPeon = params.getLoadManagementPeons().get(toServerName);
loadPeon.loadSegment(segment.getSegment(), new LoadPeonCallback()
{
@Override
protected void execute()
{
}
});
currentlyMovingSegments.get("normal").put(segmentName, segment);
}
catch (Exception e) {
log.info(e, String.format("[%s] : Moving exception", segmentName));
}
} else {
currentlyMovingSegments.get("normal").remove(segment);
}
}
}

View File

@ -0,0 +1,37 @@
package com.metamx.druid.master;
import com.metamx.druid.client.DataSegment;
import com.metamx.phonebook.PhoneBook;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
public class LoadQueuePeonTester extends LoadQueuePeon
{
private ConcurrentSkipListSet<DataSegment> segmentsToLoad;
public LoadQueuePeonTester(
PhoneBook yp,
String basePath,
ScheduledExecutorService zkWritingExecutor
)
{
super(yp, basePath, zkWritingExecutor);
}
@Override
public void loadSegment(
DataSegment segment,
LoadPeonCallback callback
)
{
if(segmentsToLoad == null) segmentsToLoad = new ConcurrentSkipListSet<DataSegment>();
segmentsToLoad.add(segment);
}
public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
{
if(segmentsToLoad == null) segmentsToLoad = new ConcurrentSkipListSet<DataSegment>();
return segmentsToLoad;
}
}