diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 450da27165e..f2a4ffeff44 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -50,11 +50,11 @@ public class DruidMasterBalancer implements DruidMasterHelper } } ); - private static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class); + protected static final EmittingLogger log = new EmittingLogger(DruidMasterBalancer.class); - private final DruidMaster master; + protected final DruidMaster master; - private final Map> currentlyMovingSegments = Maps.newHashMap(); + protected final Map> currentlyMovingSegments = Maps.newHashMap(); public DruidMasterBalancer( DruidMaster master @@ -63,7 +63,7 @@ public class DruidMasterBalancer implements DruidMasterHelper this.master = master; } - private void reduceLifetimes(String tier) + protected void reduceLifetimes(String tier) { for (BalancerSegmentHolder holder : currentlyMovingSegments.get(tier).values()) { holder.reduceLifetime(); @@ -159,7 +159,7 @@ public class DruidMasterBalancer implements DruidMasterHelper .build(); } - private void moveSegment( + protected void moveSegment( final BalancerSegmentHolder segment, final DruidServer toServer, final DruidMasterRuntimeParams params diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java index 3470f53633e..6aa2814a97f 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTest.java @@ -28,6 +28,7 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; import com.metamx.druid.shard.NoneShardSpec; +import com.metamx.phonebook.PhoneBook; import junit.framework.Assert; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -39,6 +40,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; /** */ @@ -55,7 +57,6 @@ public class DruidMasterBalancerTest private DataSegment segment3; private DataSegment segment4; Map segments; - private LoadQueuePeon peon; private DruidDataSource dataSource; @Before @@ -70,7 +71,6 @@ public class DruidMasterBalancerTest segment2 = EasyMock.createMock(DataSegment.class); segment3 = EasyMock.createMock(DataSegment.class); segment4 = EasyMock.createMock(DataSegment.class); - peon = EasyMock.createMock(LoadQueuePeon.class); dataSource = EasyMock.createMock(DruidDataSource.class); DateTime start1 = new DateTime("2012-01-01"); @@ -118,10 +118,10 @@ public class DruidMasterBalancerTest ); segments = new HashMap(); - segments.put("segment1", segment1); - segments.put("segment2", segment2); - segments.put("segment3", segment3); - segments.put("segment4", segment4); + segments.put("datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment1); + segments.put("datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment2); + segments.put("datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment3); + segments.put("datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment4); } @After @@ -132,7 +132,6 @@ public class DruidMasterBalancerTest EasyMock.verify(druidServer2); EasyMock.verify(druidServer3); EasyMock.verify(druidServer4); - EasyMock.verify(peon); EasyMock.verify(dataSource); } @@ -146,6 +145,26 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer1.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); + EasyMock.expect( + druidServer1.getSegment( + "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer1.getSegment( + "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer1.getSegment( + "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); + EasyMock.expect( + druidServer1.getSegment( + "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" + ) + ).andReturn(null).anyTimes(); EasyMock.replay(druidServer1); EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); @@ -154,29 +173,8 @@ public class DruidMasterBalancerTest EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); EasyMock.expect(druidServer2.getDataSources()).andReturn(Arrays.asList(dataSource)).anyTimes(); - EasyMock.expect( - druidServer2.getSegment( - "datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" - ) - ).andReturn(null).anyTimes(); - EasyMock.expect( - druidServer2.getSegment( - "datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" - ) - ).andReturn(null).anyTimes(); - EasyMock.expect( - druidServer2.getSegment( - "datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" - ) - ).andReturn(null).anyTimes(); - EasyMock.expect( - druidServer2.getSegment( - "datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z" - ) - ).andReturn(null).anyTimes(); - EasyMock.expect(druidServer2.getSegment("segment3")).andReturn(null).anyTimes(); - EasyMock.expect(druidServer2.getSegment("segment4")).andReturn(null).anyTimes(); EasyMock.replay(druidServer2); + EasyMock.replay(druidServer3); EasyMock.replay(druidServer4); @@ -198,12 +196,11 @@ public class DruidMasterBalancerTest EasyMock.anyObject(), EasyMock.anyObject() ); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expectLastCall().anyTimes(); EasyMock.replay(master); - EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(peon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.replay(peon); + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "from", EasyMock.createMock(ScheduledExecutorService.class)); + LoadQueuePeonTester toPeon = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "to", EasyMock.createMock(ScheduledExecutorService.class)); DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder() @@ -211,27 +208,27 @@ public class DruidMasterBalancerTest new DruidCluster( ImmutableMap.>of( "normal", - MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator) + MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator) .create( Arrays.asList( - new ServerHolder(druidServer1, peon), - new ServerHolder(druidServer2, peon) + new ServerHolder(druidServer1, fromPeon), + new ServerHolder(druidServer2, toPeon) ) ) ) ) ) - .withLoadManagementPeons(ImmutableMap.of("from", peon, "to", peon)) + .withLoadManagementPeons(ImmutableMap.of("from", fromPeon, "to", toPeon)) .withAvailableSegments(segments.values()) .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); - params = new DruidMasterBalancer(master).run(params); + params = new DruidMasterBalancerTester(master).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); } - @Test + @Test public void testRun2() { // Mock some servers of different usages @@ -345,12 +342,13 @@ public class DruidMasterBalancerTest EasyMock.anyObject(), EasyMock.anyObject() ); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expectLastCall().anyTimes(); EasyMock.replay(master); - EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(peon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.replay(peon); + LoadQueuePeonTester peon1 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "1", EasyMock.createMock(ScheduledExecutorService.class)); + LoadQueuePeonTester peon2 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "2", EasyMock.createMock(ScheduledExecutorService.class)); + LoadQueuePeonTester peon3 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "3", EasyMock.createMock(ScheduledExecutorService.class)); + LoadQueuePeonTester peon4 = new LoadQueuePeonTester(EasyMock.createMock(PhoneBook.class), "4", EasyMock.createMock(ScheduledExecutorService.class)); DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder() @@ -358,25 +356,25 @@ public class DruidMasterBalancerTest new DruidCluster( ImmutableMap.>of( "normal", - MinMaxPriorityQueue.orderedBy(DruidMasterBalancer.percentUsedComparator) + MinMaxPriorityQueue.orderedBy(DruidMasterBalancerTester.percentUsedComparator) .create( Arrays.asList( - new ServerHolder(druidServer1, peon), - new ServerHolder(druidServer2, peon), - new ServerHolder(druidServer3, peon), - new ServerHolder(druidServer4, peon) + new ServerHolder(druidServer1, peon1), + new ServerHolder(druidServer2, peon2), + new ServerHolder(druidServer3, peon3), + new ServerHolder(druidServer4, peon4) ) ) ) ) ) - .withLoadManagementPeons(ImmutableMap.of("1", peon, "2", peon, "3", peon, "4", peon)) + .withLoadManagementPeons(ImmutableMap.of("1", peon1, "2", peon2, "3", peon3, "4", peon4)) .withAvailableSegments(segments.values()) .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); - params = new DruidMasterBalancer(master).run(params); + params = new DruidMasterBalancerTester(master).run(params); Assert.assertTrue(params.getMasterStats().getPerTierStats().get("movedCount").get("normal").get() > 0); } } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTester.java b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTester.java new file mode 100644 index 00000000000..3b5865fb361 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterBalancerTester.java @@ -0,0 +1,57 @@ +package com.metamx.druid.master; + +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.DruidServer; + +public class DruidMasterBalancerTester extends DruidMasterBalancer +{ + public DruidMasterBalancerTester(DruidMaster master) + { + super(master); + } + + @Override + protected void moveSegment( + final BalancerSegmentHolder segment, + final DruidServer toServer, + final DruidMasterRuntimeParams params + ) + { + final String toServerName = toServer.getName(); + final LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServerName); + + final String fromServerName = segment.getFromServer().getName(); + final DataSegment segmentToMove = segment.getSegment(); + final String segmentName = segmentToMove.getIdentifier(); + + if (!toPeon.getSegmentsToLoad().contains(segmentToMove) && + !currentlyMovingSegments.get("normal").containsKey(segmentName) && + !toServer.getSegments().containsKey(segmentName) && + new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) { + log.info( + "Moving [%s] from [%s] to [%s]", + segmentName, + fromServerName, + toServerName + ); + try { + final LoadQueuePeon loadPeon = params.getLoadManagementPeons().get(toServerName); + + loadPeon.loadSegment(segment.getSegment(), new LoadPeonCallback() + { + @Override + protected void execute() + { + } + }); + + currentlyMovingSegments.get("normal").put(segmentName, segment); + } + catch (Exception e) { + log.info(e, String.format("[%s] : Moving exception", segmentName)); + } + } else { + currentlyMovingSegments.get("normal").remove(segment); + } + } +} diff --git a/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java new file mode 100644 index 00000000000..9363bd2e95c --- /dev/null +++ b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java @@ -0,0 +1,37 @@ +package com.metamx.druid.master; + +import com.metamx.druid.client.DataSegment; +import com.metamx.phonebook.PhoneBook; + +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ScheduledExecutorService; + +public class LoadQueuePeonTester extends LoadQueuePeon +{ + private ConcurrentSkipListSet segmentsToLoad; + + public LoadQueuePeonTester( + PhoneBook yp, + String basePath, + ScheduledExecutorService zkWritingExecutor + ) + { + super(yp, basePath, zkWritingExecutor); + } + + @Override + public void loadSegment( + DataSegment segment, + LoadPeonCallback callback + ) + { + if(segmentsToLoad == null) segmentsToLoad = new ConcurrentSkipListSet(); + segmentsToLoad.add(segment); + } + + public ConcurrentSkipListSet getSegmentsToLoad() + { + if(segmentsToLoad == null) segmentsToLoad = new ConcurrentSkipListSet(); + return segmentsToLoad; + } +}