From 1a7adabf5768b5115f37d9924b194b5f7ff48862 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 20 Jun 2018 23:04:41 -0700 Subject: [PATCH] Coordinator segment balancer max load queue fix (#5888) * Coordinator segment balancer will now respect "maxSegmentsInNodeLoadingQueue" config * allow moves from full load queues * better variable names --- .../helper/DruidCoordinatorBalancer.java | 26 +++++---- .../DruidCoordinatorBalancerTest.java | 55 ++++++++++++++++--- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index cc266d705df..15f13438163 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -20,9 +20,9 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.server.coordinator.BalancerSegmentHolder; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; @@ -103,31 +103,37 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper return; } - final List serverHolderList = Lists.newArrayList(servers); + final List toMoveFrom = Lists.newArrayList(servers); + final List toMoveTo = Lists.newArrayList(servers); - if (serverHolderList.size() <= 1) { + if (toMoveTo.size() <= 1) { log.info("[%s]: One or fewer servers found. Cannot balance.", tier); return; } int numSegments = 0; - for (ServerHolder server : serverHolderList) { - numSegments += server.getServer().getSegments().size(); + for (ServerHolder sourceHolder : toMoveFrom) { + numSegments += sourceHolder.getServer().getSegments().size(); } if (numSegments == 0) { log.info("No segments found. Cannot balance."); return; } + + final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; for (int iter = 0; iter < maxSegmentsToMove; iter++) { - final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); + if (maxToLoad > 0) { + toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad); + } + final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); + final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo); - if (holder != null) { - moveSegment(segmentToMove, holder.getServer(), params); + if (destinationHolder != null) { + moveSegment(segmentToMove, destinationHolder.getServer(), params); } else { ++unmoved; } @@ -140,7 +146,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper stats.addToTieredStat("unmovedCount", tier, unmoved); stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); if (params.getCoordinatorDynamicConfig().emitBalancingStats()) { - strategy.emitStats(tier, stats, serverHolderList); + strategy.emitStats(tier, stats, toMoveFrom); } log.info( "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index 2469bd50f0b..b5c3b210535 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -162,7 +162,6 @@ public class DruidCoordinatorBalancerTest balancerStrategyExecutor.shutdownNow(); } - @Test public void testMoveToEmptyServerBalancer() { @@ -185,7 +184,7 @@ public class DruidCoordinatorBalancerTest ) ); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(peon1, peon2) ) @@ -196,6 +195,48 @@ public class DruidCoordinatorBalancerTest Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); } + @Test + public void testMoveMaxLoadQueueServerBalancer() + { + mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); + mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap()); + + EasyMock.replay(druidServer3); + EasyMock.replay(druidServer4); + + // Mock stuff that the coordinator needs + mockCoordinator(coordinator); + + BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy( + balancerStrategy, + ImmutableList.of( + new BalancerSegmentHolder(druidServer1, segment1), + new BalancerSegmentHolder(druidServer1, segment2), + new BalancerSegmentHolder(druidServer1, segment3), + new BalancerSegmentHolder(druidServer1, segment4) + ) + ); + + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2), + ImmutableList.of(peon1, peon2) + ) + .withBalancerStrategy(predefinedPickOrderStrategy) + .withDynamicConfigs( + CoordinatorDynamicConfig + .builder() + .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) + .withMaxSegmentsInNodeLoadingQueue(1) + .build() + ) + .build(); + + params = new DruidCoordinatorBalancerTester(coordinator).run(params); + + // max to move is 5, all segments on server 1, but only expect to move 1 to server 2 since max node load queue is 1 + Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + } + @Test public void testMoveSameSegmentTwice() { @@ -215,7 +256,7 @@ public class DruidCoordinatorBalancerTest ) ); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(peon1, peon2) ) @@ -244,7 +285,7 @@ public class DruidCoordinatorBalancerTest // Mock stuff that the coordinator needs mockCoordinator(coordinator); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(peon1, peon2) ).build(); @@ -253,7 +294,6 @@ public class DruidCoordinatorBalancerTest Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); } - @Test public void testRun2() { @@ -266,13 +306,13 @@ public class DruidCoordinatorBalancerTest // Mock stuff that the coordinator needs mockCoordinator(coordinator); - DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build(); + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons).build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); } - private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder( + private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( List druidServers, List peons ) @@ -392,5 +432,4 @@ public class DruidCoordinatorBalancerTest delegate.emitStats(tier, stats, serverHolderList); } } - }