Coordinator segment balancer max load queue fix (#5888)

* Coordinator segment balancer will now respect "maxSegmentsInNodeLoadingQueue" config

* allow moves from full load queues

* better variable names
This commit is contained in:
Clint Wylie 2018-06-20 23:04:41 -07:00 committed by Gian Merlino
parent 0982472c90
commit 1a7adabf57
2 changed files with 63 additions and 18 deletions

View File

@ -20,9 +20,9 @@
package io.druid.server.coordinator.helper; package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.client.ImmutableDruidServer; import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerSegmentHolder; import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.CoordinatorStats;
@ -103,31 +103,37 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
return; return;
} }
final List<ServerHolder> serverHolderList = Lists.newArrayList(servers); final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);
if (serverHolderList.size() <= 1) { if (toMoveTo.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier); log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
return; return;
} }
int numSegments = 0; int numSegments = 0;
for (ServerHolder server : serverHolderList) { for (ServerHolder sourceHolder : toMoveFrom) {
numSegments += server.getServer().getSegments().size(); numSegments += sourceHolder.getServer().getSegments().size();
} }
if (numSegments == 0) { if (numSegments == 0) {
log.info("No segments found. Cannot balance."); log.info("No segments found. Cannot balance.");
return; return;
} }
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L; long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) { for (int iter = 0; iter < maxSegmentsToMove; iter++) {
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); if (maxToLoad > 0) {
toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad);
}
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);
if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo);
if (holder != null) { if (destinationHolder != null) {
moveSegment(segmentToMove, holder.getServer(), params); moveSegment(segmentToMove, destinationHolder.getServer(), params);
} else { } else {
++unmoved; ++unmoved;
} }
@ -140,7 +146,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
stats.addToTieredStat("unmovedCount", tier, unmoved); stats.addToTieredStat("unmovedCount", tier, unmoved);
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getCoordinatorDynamicConfig().emitBalancingStats()) { if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList); strategy.emitStats(tier, stats, toMoveFrom);
} }
log.info( log.info(
"[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",

View File

@ -162,7 +162,6 @@ public class DruidCoordinatorBalancerTest
balancerStrategyExecutor.shutdownNow(); balancerStrategyExecutor.shutdownNow();
} }
@Test @Test
public void testMoveToEmptyServerBalancer() public void testMoveToEmptyServerBalancer()
{ {
@ -185,7 +184,7 @@ public class DruidCoordinatorBalancerTest
) )
); );
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2) ImmutableList.of(peon1, peon2)
) )
@ -196,6 +195,48 @@ public class DruidCoordinatorBalancerTest
Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
} }
@Test
public void testMoveMaxLoadQueueServerBalancer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());
EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);
BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
new BalancerSegmentHolder(druidServer1, segment2),
new BalancerSegmentHolder(druidServer1, segment3),
new BalancerSegmentHolder(druidServer1, segment4)
)
);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withDynamicConfigs(
CoordinatorDynamicConfig
.builder()
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.withMaxSegmentsInNodeLoadingQueue(1)
.build()
)
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
// max to move is 5, all segments on server 1, but only expect to move 1 to server 2 since max node load queue is 1
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
@Test @Test
public void testMoveSameSegmentTwice() public void testMoveSameSegmentTwice()
{ {
@ -215,7 +256,7 @@ public class DruidCoordinatorBalancerTest
) )
); );
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2) ImmutableList.of(peon1, peon2)
) )
@ -244,7 +285,7 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs // Mock stuff that the coordinator needs
mockCoordinator(coordinator); mockCoordinator(coordinator);
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder( DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2), ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2) ImmutableList.of(peon1, peon2)
).build(); ).build();
@ -253,7 +294,6 @@ public class DruidCoordinatorBalancerTest
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
} }
@Test @Test
public void testRun2() public void testRun2()
{ {
@ -266,13 +306,13 @@ public class DruidCoordinatorBalancerTest
// Mock stuff that the coordinator needs // Mock stuff that the coordinator needs
mockCoordinator(coordinator); mockCoordinator(coordinator);
DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build(); DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons).build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params); params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
} }
private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder( private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers, List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons List<LoadQueuePeon> peons
) )
@ -392,5 +432,4 @@ public class DruidCoordinatorBalancerTest
delegate.emitStats(tier, stats, serverHolderList); delegate.emitStats(tier, stats, serverHolderList);
} }
} }
} }