mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
refactor BalanceSegments#balanceServers to exit early if there is no work to be done (#11768)
* remove useless call to balanceServers for move from decom servers when there are no decom servers * refactor approach to this PR but accomplish the same thing
This commit is contained in:
parent
abac9e39ed
commit
43383c73a8
@ -144,6 +144,8 @@ public class BalanceSegments implements CoordinatorDuty
|
||||
}
|
||||
|
||||
final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
|
||||
|
||||
// Prioritize moving segments from decomissioning servers.
|
||||
int decommissioningMaxPercentOfMaxSegmentsToMove =
|
||||
params.getCoordinatorDynamicConfig().getDecommissioningMaxPercentOfMaxSegmentsToMove();
|
||||
int maxSegmentsToMoveFromDecommissioningNodes =
|
||||
@ -155,6 +157,7 @@ public class BalanceSegments implements CoordinatorDuty
|
||||
Pair<Integer, Integer> decommissioningResult =
|
||||
balanceServers(params, decommissioningServers, activeServers, maxSegmentsToMoveFromDecommissioningNodes);
|
||||
|
||||
// After moving segments from decomissioning servers, move the remaining segments from the rest of the servers.
|
||||
int maxGeneralSegmentsToMove = maxSegmentsToMove - decommissioningResult.lhs;
|
||||
log.info("Processing %d segments for balancing between active servers", maxGeneralSegmentsToMove);
|
||||
Pair<Integer, Integer> generalResult =
|
||||
@ -184,6 +187,13 @@ public class BalanceSegments implements CoordinatorDuty
|
||||
)
|
||||
{
|
||||
if (maxSegmentsToMove <= 0) {
|
||||
log.debug("maxSegmentsToMove is 0; no balancing work can be performed.");
|
||||
return new Pair<>(0, 0);
|
||||
} else if (toMoveFrom.isEmpty()) {
|
||||
log.debug("toMoveFrom is empty; no balancing work can be performed.");
|
||||
return new Pair<>(0, 0);
|
||||
} else if (toMoveTo.isEmpty()) {
|
||||
log.debug("toMoveTo is empty; no balancing work can be peformed.");
|
||||
return new Pair<>(0, 0);
|
||||
}
|
||||
|
||||
|
@ -236,7 +236,7 @@ public class BalanceSegmentsTest
|
||||
EasyMock.expect(
|
||||
strategy.pickSegmentsToMove(
|
||||
ImmutableList.of(
|
||||
new ServerHolder(druidServer2, peon2, false)
|
||||
new ServerHolder(druidServer2, peon2, true)
|
||||
),
|
||||
broadcastDatasources,
|
||||
1,
|
||||
@ -276,6 +276,7 @@ public class BalanceSegmentsTest
|
||||
.build();
|
||||
|
||||
params = new BalanceSegmentsTester(coordinator).run(params);
|
||||
EasyMock.verify(strategy);
|
||||
Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
|
||||
Assert.assertThat(
|
||||
peon3.getSegmentsToLoad(),
|
||||
@ -316,9 +317,8 @@ public class BalanceSegmentsTest
|
||||
mockCoordinator(coordinator);
|
||||
|
||||
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
|
||||
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
|
||||
.andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator());
|
||||
EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
|
||||
EasyMock.expect(
|
||||
strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
|
||||
.andReturn(
|
||||
ImmutableList.of(
|
||||
new BalancerSegmentHolder(druidServer1, segment2),
|
||||
@ -346,10 +346,11 @@ public class BalanceSegmentsTest
|
||||
.build();
|
||||
|
||||
params = new BalanceSegmentsTester(coordinator).run(params);
|
||||
EasyMock.verify(strategy);
|
||||
Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
|
||||
Assert.assertThat(
|
||||
peon3.getSegmentsToLoad(),
|
||||
Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1, segment2, segment3)))
|
||||
Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2, segment3, segment4)))
|
||||
);
|
||||
}
|
||||
|
||||
@ -387,6 +388,7 @@ public class BalanceSegmentsTest
|
||||
.build();
|
||||
|
||||
params = new BalanceSegmentsTester(coordinator).run(params);
|
||||
EasyMock.verify(strategy);
|
||||
Assert.assertEquals(0, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
|
||||
}
|
||||
|
||||
@ -422,6 +424,7 @@ public class BalanceSegmentsTest
|
||||
.build();
|
||||
|
||||
params = new BalanceSegmentsTester(coordinator).run(params);
|
||||
EasyMock.verify(strategy);
|
||||
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
|
||||
Assert.assertEquals(0, peon1.getNumberOfSegmentsInQueue());
|
||||
Assert.assertEquals(1, peon2.getNumberOfSegmentsInQueue());
|
||||
@ -564,18 +567,7 @@ public class BalanceSegmentsTest
|
||||
|
||||
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
|
||||
|
||||
// The first call for decommissioning servers
|
||||
EasyMock.expect(
|
||||
strategy.pickSegmentsToMove(
|
||||
ImmutableList.of(),
|
||||
broadcastDatasources,
|
||||
1,
|
||||
40
|
||||
)
|
||||
)
|
||||
.andReturn(Collections.emptyIterator());
|
||||
|
||||
// The second call for the single non decommissioning server move
|
||||
// Move from non-decomissioning servers
|
||||
EasyMock.expect(
|
||||
strategy.pickSegmentsToMove(
|
||||
ImmutableList.of(
|
||||
@ -611,6 +603,7 @@ public class BalanceSegmentsTest
|
||||
.build();
|
||||
|
||||
params = new BalanceSegmentsTester(coordinator).run(params);
|
||||
EasyMock.verify(strategy);
|
||||
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
|
||||
Assert.assertThat(
|
||||
peon3.getSegmentsToLoad(),
|
||||
|
Loading…
x
Reference in New Issue
Block a user