From 4dd1e2b59ee3c34d699c51e891dcfeb62ee2db39 Mon Sep 17 00:00:00 2001 From: dgolitsyn Date: Tue, 8 Aug 2017 17:22:59 +0300 Subject: [PATCH] Do not remove segments from currentlyMovingSegments in DruidBalancer if move is impossible or not needed (#4472) * Do not remove segment that should not be moved from currentlyMovingSegments (segments are removed by callbacks or not inserted) * Replace putIfAbsent with computeIfAbsent in DruidBalancer * Refactoring --- .../helper/DruidCoordinatorBalancer.java | 163 +++---- .../DruidCoordinatorBalancerTest.java | 440 +++++++++--------- .../DruidCoordinatorBalancerTester.java | 2 - 3 files changed, 281 insertions(+), 324 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 89bf05b4602..679eabe7fe2 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -20,12 +20,10 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.EmittingLogger; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.guava.Comparators; import io.druid.server.coordinator.BalancerSegmentHolder; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; @@ -37,6 +35,7 @@ import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -45,21 +44,15 @@ import java.util.concurrent.ConcurrentHashMap; */ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper { - public static final Comparator percentUsedComparator = Comparators.inverse( - new Comparator() - { - @Override - public int compare(ServerHolder lhs, ServerHolder rhs) - { - return lhs.getPercentUsed().compareTo(rhs.getPercentUsed()); - } - } - ); + public static final Comparator percentUsedComparator = + Comparator.comparing(ServerHolder::getPercentUsed).reversed(); + protected static final EmittingLogger log = new EmittingLogger(DruidCoordinatorBalancer.class); protected final DruidCoordinator coordinator; - protected final Map> currentlyMovingSegments = Maps.newHashMap(); + protected final Map> currentlyMovingSegments = + new HashMap<>(); public DruidCoordinatorBalancer( DruidCoordinator coordinator @@ -85,74 +78,76 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { final CoordinatorStats stats = new CoordinatorStats(); + params.getDruidCluster().getHistoricals().forEach((String tier, MinMaxPriorityQueue servers) -> { + balanceTier(params, tier, servers, stats); + }); + return params.buildFromExisting().withCoordinatorStats(stats).build(); + } + + private void balanceTier( + DruidCoordinatorRuntimeParams params, + String tier, + MinMaxPriorityQueue servers, + CoordinatorStats stats + ) + { final BalancerStrategy strategy = params.getBalancerStrategy(); final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(); - for (Map.Entry> entry : - params.getDruidCluster().getHistoricals().entrySet()) { - String tier = entry.getKey(); - - if (currentlyMovingSegments.get(tier) == null) { - currentlyMovingSegments.put(tier, new ConcurrentHashMap()); - } - - if (!currentlyMovingSegments.get(tier).isEmpty()) { - reduceLifetimes(tier); - log.info("[%s]: Still waiting on %,d segments to be moved", tier, currentlyMovingSegments.size()); - continue; - } - - final List serverHolderList = Lists.newArrayList(entry.getValue()); - - if (serverHolderList.size() <= 1) { - log.info("[%s]: One or fewer servers found. Cannot balance.", tier); - continue; - } - - int numSegments = 0; - for (ServerHolder server : serverHolderList) { - numSegments += server.getServer().getSegments().size(); - } - - if (numSegments == 0) { - log.info("No segments found. Cannot balance."); - continue; - } - long unmoved = 0L; - for (int iter = 0; iter < maxSegmentsToMove; iter++) { - final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); - - if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); - - if (holder != null) { - moveSegment(segmentToMove, holder.getServer(), params); - } else { - ++unmoved; - } - } - } - if (unmoved == maxSegmentsToMove) { - // Cluster should be alive and constantly adjusting - log.info("No good moves found in tier [%s]", tier); - } - stats.addToTieredStat("unmovedCount", tier, unmoved); - stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); - if (params.getCoordinatorDynamicConfig().emitBalancingStats()) { - strategy.emitStats(tier, stats, serverHolderList); - } - log.info( - "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", - tier, - currentlyMovingSegments.get(tier).size(), - unmoved - ); + currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>()); + if (!currentlyMovingSegments.get(tier).isEmpty()) { + reduceLifetimes(tier); + log.info("[%s]: Still waiting on %,d segments to be moved", tier, currentlyMovingSegments.size()); + return; } - return params.buildFromExisting() - .withCoordinatorStats(stats) - .build(); + + final List serverHolderList = Lists.newArrayList(servers); + + if (serverHolderList.size() <= 1) { + log.info("[%s]: One or fewer servers found. Cannot balance.", tier); + return; + } + + int numSegments = 0; + for (ServerHolder server : serverHolderList) { + numSegments += server.getServer().getSegments().size(); + } + + if (numSegments == 0) { + log.info("No segments found. Cannot balance."); + return; + } + long unmoved = 0L; + for (int iter = 0; iter < maxSegmentsToMove; iter++) { + final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); + + if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { + final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); + + if (holder != null) { + moveSegment(segmentToMove, holder.getServer(), params); + } else { + ++unmoved; + } + } + } + if (unmoved == maxSegmentsToMove) { + // Cluster should be alive and constantly adjusting + log.info("No good moves found in tier [%s]", tier); + } + stats.addToTieredStat("unmovedCount", tier, unmoved); + stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); + if (params.getCoordinatorDynamicConfig().emitBalancingStats()) { + strategy.emitStats(tier, stats, serverHolderList); + } + log.info( + "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", + tier, + currentlyMovingSegments.get(tier).size(), + unmoved + ); } protected void moveSegment( @@ -174,18 +169,9 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper LoadPeonCallback callback = null; try { - currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment); - callback = new LoadPeonCallback() - { - @Override - public void execute() - { - Map movingSegments = currentlyMovingSegments.get(toServer.getTier()); - if (movingSegments != null) { - movingSegments.remove(segmentName); - } - } - }; + Map movingSegments = currentlyMovingSegments.get(toServer.getTier()); + movingSegments.put(segmentName, segment); + callback = () -> movingSegments.remove(segmentName); coordinator.moveSegment( fromServer, toServer, @@ -199,9 +185,6 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper callback.execute(); } } - } else { - currentlyMovingSegments.get(toServer.getTier()).remove(segmentName); } - } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index 6a5272d899e..ea6ca8b7921 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -19,6 +19,7 @@ package io.druid.server.coordinator; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -37,10 +38,14 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** */ @@ -52,11 +57,19 @@ public class DruidCoordinatorBalancerTest private ImmutableDruidServer druidServer2; private ImmutableDruidServer druidServer3; private ImmutableDruidServer druidServer4; + private List druidServers; + private LoadQueuePeonTester peon1; + private LoadQueuePeonTester peon2; + private LoadQueuePeonTester peon3; + private LoadQueuePeonTester peon4; + private List peons; private DataSegment segment1; private DataSegment segment2; private DataSegment segment3; private DataSegment segment4; - Map segments; + private Map segments; + private ListeningExecutorService balancerStrategyExecutor; + private BalancerStrategy balancerStrategy; @Before public void setUp() throws Exception @@ -78,9 +91,9 @@ public class DruidCoordinatorBalancerTest "datasource1", new Interval(start1, start1.plusHours(1)), version.toString(), - Maps.newHashMap(), - Lists.newArrayList(), - Lists.newArrayList(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), NoneShardSpec.instance(), 0, 11L @@ -89,9 +102,9 @@ public class DruidCoordinatorBalancerTest "datasource1", new Interval(start2, start2.plusHours(1)), version.toString(), - Maps.newHashMap(), - Lists.newArrayList(), - Lists.newArrayList(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), NoneShardSpec.instance(), 0, 7L @@ -100,9 +113,9 @@ public class DruidCoordinatorBalancerTest "datasource2", new Interval(start1, start1.plusHours(1)), version.toString(), - Maps.newHashMap(), - Lists.newArrayList(), - Lists.newArrayList(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), NoneShardSpec.instance(), 0, 4L @@ -111,19 +124,30 @@ public class DruidCoordinatorBalancerTest "datasource2", new Interval(start2, start2.plusHours(1)), version.toString(), - Maps.newHashMap(), - Lists.newArrayList(), - Lists.newArrayList(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), NoneShardSpec.instance(), 0, 8L ); - segments = new HashMap(); + segments = new HashMap<>(); segments.put("datasource1_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment1); segments.put("datasource1_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment2); segments.put("datasource2_2012-01-01T00:00:00.000Z_2012-01-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment3); segments.put("datasource2_2012-02-01T00:00:00.000Z_2012-02-01T01:00:00.000Z_2012-03-01T00:00:00.000Z", segment4); + + peon1 = new LoadQueuePeonTester(); + peon2 = new LoadQueuePeonTester(); + peon3 = new LoadQueuePeonTester(); + peon4 = new LoadQueuePeonTester(); + + druidServers = ImmutableList.of(druidServer1, druidServer2, druidServer3, druidServer4); + peons = ImmutableList.of(peon1, peon2, peon3, peon4); + + balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor); } @After @@ -134,166 +158,87 @@ public class DruidCoordinatorBalancerTest EasyMock.verify(druidServer2); EasyMock.verify(druidServer3); EasyMock.verify(druidServer4); + balancerStrategyExecutor.shutdownNow(); } @Test public void testMoveToEmptyServerBalancer() throws IOException { - EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce(); - EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); - EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); - EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer1); - - EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); - EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); - EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); - EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer2); + mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); + mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap()); EasyMock.replay(druidServer3); EasyMock.replay(druidServer4); // Mock stuff that the coordinator needs - coordinator.moveSegment( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject() - ); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(coordinator); + mockCoordinator(coordinator); - LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); - LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); - - DruidCoordinatorRuntimeParams params = - DruidCoordinatorRuntimeParams.newBuilder() - .withDruidCluster( - new DruidCluster( - null, - ImmutableMap.>of( - "normal", - MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) - .create( - Arrays.asList( - new ServerHolder(druidServer1, fromPeon), - new ServerHolder(druidServer2, toPeon) - ) - ) - ) - ) - ) - .withLoadManagementPeons( - ImmutableMap.of( - "from", - fromPeon, - "to", - toPeon - ) - ) - .withAvailableSegments(segments.values()) - .withDynamicConfigs( - new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove( - MAX_SEGMENTS_TO_MOVE - ).build() - ) - .withBalancerStrategy(balancerStrategy) - .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) - .build(); + DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2), + ImmutableList.of(peon1, peon2) + ).build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") < segments.size()); - exec.shutdown(); } + @Test + public void testMoveSameSegmentTwice() throws Exception + { + mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); + mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap()); + + EasyMock.replay(druidServer3); + EasyMock.replay(druidServer4); + + // Mock stuff that the coordinator needs + mockCoordinator(coordinator); + + BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy( + balancerStrategy, + ImmutableList.of( + new BalancerSegmentHolder(druidServer1, segment1) + ) + ); + + DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2), + ImmutableList.of(peon1, peon2) + ) + .withBalancerStrategy(predefinedPickOrderStrategy) + .withDynamicConfigs( + new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove( + 2 + ).build() + ) + .build(); + + params = new DruidCoordinatorBalancerTester(coordinator).run(params); + Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + } @Test public void testRun1() throws IOException { // Mock some servers of different usages - - EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce(); - EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); - EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); - EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer1); - - EasyMock.expect(druidServer2.getName()).andReturn("to").atLeastOnce(); - EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); - EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); - EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer2); + mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); + mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap()); EasyMock.replay(druidServer3); EasyMock.replay(druidServer4); // Mock stuff that the coordinator needs - coordinator.moveSegment( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject() - ); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(coordinator); + mockCoordinator(coordinator); - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); - - LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); - LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); - DruidCoordinatorRuntimeParams params = - DruidCoordinatorRuntimeParams.newBuilder() - .withDruidCluster( - new DruidCluster( - null, - ImmutableMap.>of( - "normal", - MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) - .create( - Arrays.asList( - new ServerHolder(druidServer1, fromPeon), - new ServerHolder(druidServer2, toPeon) - ) - ) - ) - ) - ) - .withLoadManagementPeons( - ImmutableMap.of( - "from", - fromPeon, - "to", - toPeon - ) - ) - .withAvailableSegments(segments.values()) - .withDynamicConfigs( - new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) - .build() - ) - .withBalancerStrategy(balancerStrategy) - .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) - .build(); + DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2), + ImmutableList.of(peon1, peon2) + ).build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); - exec.shutdown(); } @@ -301,101 +246,132 @@ public class DruidCoordinatorBalancerTest public void testRun2() throws IOException { // Mock some servers of different usages - EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce(); - EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); - EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes(); - EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer1); - - EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce(); - EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); - EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer2.getSegments()).andReturn(new HashMap()).anyTimes(); - EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer2); - - EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce(); - EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes(); - EasyMock.expect(druidServer3.getCurrSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer3.getSegments()).andReturn(new HashMap()).anyTimes(); - EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer3); - - EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce(); - EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes(); - EasyMock.expect(druidServer4.getCurrSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce(); - EasyMock.expect(druidServer4.getSegments()).andReturn(new HashMap()).anyTimes(); - EasyMock.expect(druidServer4.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); - EasyMock.replay(druidServer4); + mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); + mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap()); + mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyMap()); + mockDruidServer(druidServer4, "4", "normal", 0L, 100L, Collections.emptyMap()); // Mock stuff that the coordinator needs - coordinator.moveSegment( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject() - ); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(coordinator); + mockCoordinator(coordinator); - LoadQueuePeonTester peon1 = new LoadQueuePeonTester(); - LoadQueuePeonTester peon2 = new LoadQueuePeonTester(); - LoadQueuePeonTester peon3 = new LoadQueuePeonTester(); - LoadQueuePeonTester peon4 = new LoadQueuePeonTester(); - - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); - - DruidCoordinatorRuntimeParams params = - DruidCoordinatorRuntimeParams.newBuilder() - .withDruidCluster( - new DruidCluster( - null, - ImmutableMap.>of( - "normal", - MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) - .create( - Arrays.asList( - new ServerHolder(druidServer1, peon1), - new ServerHolder(druidServer2, peon2), - new ServerHolder(druidServer3, peon3), - new ServerHolder(druidServer4, peon4) - ) - ) - ) - ) - ) - .withLoadManagementPeons( - ImmutableMap.of( - "1", - peon1, - "2", - peon2, - "3", - peon3, - "4", - peon4 - ) - ) - .withAvailableSegments(segments.values()) - .withDynamicConfigs( - new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove( - MAX_SEGMENTS_TO_MOVE - ).build() - ) - .withBalancerStrategy(balancerStrategy) - .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) - .build(); + DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); - exec.shutdown(); + } + + private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder( + List druidServers, + List peons + ) + { + return DruidCoordinatorRuntimeParams + .newBuilder() + .withDruidCluster( + new DruidCluster( + null, + ImmutableMap.of( + "normal", + MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) + .create( + IntStream + .range(0, druidServers.size()) + .mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i))) + .collect(Collectors.toList()) + ) + ) + ) + ) + .withLoadManagementPeons( + IntStream + .range(0, peons.size()) + .boxed() + .collect(Collectors.toMap(i -> String.valueOf(i + 1), peons::get)) + ) + .withAvailableSegments(segments.values()) + .withDynamicConfigs( + new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove( + MAX_SEGMENTS_TO_MOVE + ).build() + ) + .withBalancerStrategy(balancerStrategy) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")); + } + + private void mockDruidServer( + ImmutableDruidServer druidServer, + String name, + String tier, + long currentSize, + long maxSize, + Map segments + ) + { + EasyMock.expect(druidServer.getName()).andReturn(name).atLeastOnce(); + EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes(); + EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce(); + EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce(); + EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes(); + EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer); + } + + private void mockCoordinator(DruidCoordinator coordinator) + { + coordinator.moveSegment( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(coordinator); + } + + private static class PredefinedPickOrderBalancerStrategy implements BalancerStrategy + { + private final BalancerStrategy delegate; + private final List pickOrder; + private final AtomicInteger pickCounter = new AtomicInteger(0); + + public PredefinedPickOrderBalancerStrategy( + BalancerStrategy delegate, + List pickOrder + ) + { + this.delegate = delegate; + this.pickOrder = pickOrder; + } + + @Override + public ServerHolder findNewSegmentHomeBalancer( + DataSegment proposalSegment, List serverHolders + ) + { + return delegate.findNewSegmentHomeBalancer(proposalSegment, serverHolders); + } + + @Override + public ServerHolder findNewSegmentHomeReplicator( + DataSegment proposalSegment, List serverHolders + ) + { + return delegate.findNewSegmentHomeReplicator(proposalSegment, serverHolders); + } + + @Override + public BalancerSegmentHolder pickSegmentToMove(List serverHolders) + { + return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); + } + + @Override + public void emitStats( + String tier, CoordinatorStats stats, List serverHolderList + ) + { + delegate.emitStats(tier, stats, serverHolderList); + } } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java index ef4cb50e9c2..d5a9547c7e2 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTester.java @@ -70,8 +70,6 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer catch (Exception e) { log.info(e, StringUtils.format("[%s] : Moving exception", segmentName)); } - } else { - currentlyMovingSegments.get("normal").remove(segmentName); } } }