From 150902b95ccac2cca7f77185f7f578d0bacf3b0f Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Tue, 7 Dec 2021 16:47:46 -0600 Subject: [PATCH] clean up the balancing code around the batched vs deprecated way of sampling segments to balance (#11960) * clean up the balancing code around the batched vs deprecated way of sampling segments to balance * fix docs, clarify comments, add deprecated annotations to legacy code * remove unused variable * update dynamic config dialog in console to state percentOfSegmentsToConsiderPerMove deprecated * fix dynamic config text for percentOfSegmentsToConsiderPerMove * run prettier to cleanup coordinator-dynamic-config.tsx changes * update jest snapshot * update documentation per review feedback --- .../BalancerStrategyBenchmark.java | 24 ++++-- docs/configuration/index.md | 2 +- .../server/coordinator/BalancerStrategy.java | 85 ++++++++++++------- .../coordinator/CoordinatorDynamicConfig.java | 7 +- .../coordinator/duty/BalanceSegments.java | 23 +++-- .../coordinator/BalanceSegmentsTest.java | 30 ++++--- ...inator-dynamic-config-dialog.spec.tsx.snap | 2 +- .../coordinator-dynamic-config.tsx | 21 +++-- 8 files changed, 123 insertions(+), 71 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java index beeb8f72c36..b4d38e5d2cc 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java @@ -73,6 +73,7 @@ public class BalancerStrategyBenchmark private int maxSegmentsToMove; private final List serverHolders = new ArrayList<>(); + private boolean useBatchedSegmentSampler; private int reservoirSize = 1; private double percentOfSegmentsToConsider = 100; private final BalancerStrategy balancerStrategy = new CostBalancerStrategy( @@ -85,9 +86,11 @@ public class BalancerStrategyBenchmark switch (mode) { case "50percentOfSegmentsToConsiderPerMove": percentOfSegmentsToConsider = 50; + useBatchedSegmentSampler = false; break; case "useBatchedSegmentSampler": reservoirSize = maxSegmentsToMove; + useBatchedSegmentSampler = true; break; default: } @@ -128,12 +131,21 @@ public class BalancerStrategyBenchmark @Benchmark public void pickSegmentsToMove(Blackhole blackhole) { - Iterator iterator = balancerStrategy.pickSegmentsToMove( - serverHolders, - Collections.emptySet(), - reservoirSize, - percentOfSegmentsToConsider - ); + Iterator iterator; + if (useBatchedSegmentSampler) { + iterator = balancerStrategy.pickSegmentsToMove( + serverHolders, + Collections.emptySet(), + reservoirSize + ); + } else { + iterator = balancerStrategy.pickSegmentsToMove( + serverHolders, + Collections.emptySet(), + percentOfSegmentsToConsider + ); + } + for (int i = 0; i < maxSegmentsToMove && iterator.hasNext(); i++) { blackhole.consume(iterator.next()); } diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b9a1c86a628..4178d01e8b3 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -896,7 +896,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| |`useBatchedSegmentSampler`|Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed up segment balancing process, especially if there are huge number of segments in the cluster or if there are too many segments to move.|false| -|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100| +|`percentOfSegmentsToConsiderPerMove`|Deprecated. This will eventually be phased out by the batched segment sampler. You can enable the batched segment sampler now by setting the dynamic Coordinator config, `useBatchedSegmentSampler`, to `true`. Note that if you choose to enable the batched segment sampler, `percentOfSegmentsToConsiderPerMove` will no longer have any effect on balancing. If `useBatchedSegmentSampler == false`, this config defines the percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 78ab9514ae7..6478614ba57 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -65,6 +65,57 @@ public interface BalancerStrategy * the interval or period-based broadcast rules. For simplicity of the initial * implementation, only forever broadcast rules are supported. * @param reservoirSize the reservoir size maintained by the Reservoir Sampling algorithm. + * @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently + * reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty). + */ + default Iterator pickSegmentsToMove( + List serverHolders, + Set broadcastDatasources, + int reservoirSize + ) + { + return new Iterator() + { + private Iterator it = sample(); + private Iterator sample() + { + return ReservoirSegmentSampler.getRandomBalancerSegmentHolders( + serverHolders, + broadcastDatasources, + reservoirSize + ).iterator(); + } + + @Override + public boolean hasNext() + { + if (it.hasNext()) { + return true; + } + it = sample(); + return it.hasNext(); + } + + @Override + public BalancerSegmentHolder next() + { + return it.next(); + } + }; + } + + /** + * Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. This + * is the deprecated way of picking a segment to move. pickSegmentsToMove(List, Set, int) uses + * a more performant bathced sampling method that will become the default picking mode in the future. + * + * @param serverHolders set of historicals to consider for moving segments + * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules. + * Balancing strategies should avoid rebalancing segments for such datasources, since + * they should be loaded on all servers anyway. + * NOTE: this should really be handled on a per-segment basis, to properly support + * the interval or period-based broadcast rules. For simplicity of the initial + * implementation, only forever broadcast rules are supported. * @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when * choosing which segment to move. {@link CoordinatorDynamicConfig} defines a * config percentOfSegmentsToConsiderPerMove that will be used as an argument @@ -72,45 +123,13 @@ public interface BalancerStrategy * @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently * reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty). */ + @Deprecated default Iterator pickSegmentsToMove( List serverHolders, Set broadcastDatasources, - int reservoirSize, double percentOfSegmentsToConsider ) { - if (reservoirSize > 1) { - return new Iterator() - { - private Iterator it = sample(); - - private Iterator sample() - { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolders( - serverHolders, - broadcastDatasources, - reservoirSize - ).iterator(); - } - - @Override - public boolean hasNext() - { - if (it.hasNext()) { - return true; - } - it = sample(); - return it.hasNext(); - } - - @Override - public BalancerSegmentHolder next() - { - return it.next(); - } - }; - } - return new Iterator() { private BalancerSegmentHolder next = sample(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 29d8052e967..9a37e2b98b3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -54,6 +54,7 @@ public class CoordinatorDynamicConfig private final long mergeBytesLimit; private final int mergeSegmentsLimit; private final int maxSegmentsToMove; + @Deprecated private final double percentOfSegmentsToConsiderPerMove; private final boolean useBatchedSegmentSampler; private final int replicantLifetime; @@ -117,7 +118,7 @@ public class CoordinatorDynamicConfig @JsonProperty("mergeBytesLimit") long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, - @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, + @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, @JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @@ -274,6 +275,7 @@ public class CoordinatorDynamicConfig return maxSegmentsToMove; } + @Deprecated @JsonProperty public double getPercentOfSegmentsToConsiderPerMove() { @@ -559,7 +561,7 @@ public class CoordinatorDynamicConfig @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, - @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, + @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @@ -623,6 +625,7 @@ public class CoordinatorDynamicConfig return this; } + @Deprecated public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove) { this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index 12de0177039..d2a1c4c8daa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -56,8 +56,6 @@ public class BalanceSegments implements CoordinatorDuty protected final Map> currentlyMovingSegments = new HashMap<>(); - private static final int DEFAULT_RESERVOIR_SIZE = 1; - public BalanceSegments(DruidCoordinator coordinator) { this.coordinator = coordinator; @@ -202,12 +200,21 @@ public class BalanceSegments implements CoordinatorDuty final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); int moved = 0, unmoved = 0; - Iterator segmentsToMove = strategy.pickSegmentsToMove( - toMoveFrom, - params.getBroadcastDatasources(), - params.getCoordinatorDynamicConfig().useBatchedSegmentSampler() ? maxSegmentsToMove : DEFAULT_RESERVOIR_SIZE, - params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove() - ); + Iterator segmentsToMove; + // The pick method depends on if the operator has enabled batched segment sampling in the Coorinator dynamic config. + if (params.getCoordinatorDynamicConfig().useBatchedSegmentSampler()) { + segmentsToMove = strategy.pickSegmentsToMove( + toMoveFrom, + params.getBroadcastDatasources(), + maxSegmentsToMove + ); + } else { + segmentsToMove = strategy.pickSegmentsToMove( + toMoveFrom, + params.getBroadcastDatasources(), + params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove() + ); + } //noinspection ForLoopThatDoesntUseLoopVariable for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index 0221a373846..d4a89abb3d2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -239,8 +239,7 @@ public class BalanceSegmentsTest new ServerHolder(druidServer2, peon2, true) ), broadcastDatasources, - 1, - 100 + 100.0 ) ).andReturn( ImmutableList.of( @@ -249,7 +248,7 @@ public class BalanceSegmentsTest ).iterator() ); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) .andReturn( ImmutableList.of( new BalancerSegmentHolder(druidServer1, segment1), @@ -318,7 +317,7 @@ public class BalanceSegmentsTest BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); EasyMock.expect( - strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) + strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) .andReturn( ImmutableList.of( new BalancerSegmentHolder(druidServer1, segment2), @@ -369,7 +368,7 @@ public class BalanceSegmentsTest mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { @@ -405,7 +404,7 @@ public class BalanceSegmentsTest ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) @@ -576,8 +575,7 @@ public class BalanceSegmentsTest new ServerHolder(druidServer1, peon1, false) ), broadcastDatasources, - 1, - 40 + 40.0 ) ) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment3)).iterator()); @@ -748,7 +746,16 @@ public class BalanceSegmentsTest public Iterator pickSegmentsToMove( List serverHolders, Set broadcastDatasources, - int numberOfSegments, + int numberOfSegments + ) + { + return pickOrder.iterator(); + } + + @Override + public Iterator pickSegmentsToMove( + List serverHolders, + Set broadcastDatasources, double percentOfSegmentsToConsider ) { @@ -780,13 +787,12 @@ public class BalanceSegmentsTest new ServerHolder(druidServer2, peon2, true) ), broadcastDatasources, - 1, - 100 + 100.0 ) ).andReturn( ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment2)).iterator() ); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyDouble())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyDouble())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index b799bb5f780..8a9e3eae210 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -179,7 +179,7 @@ exports[`coordinator dynamic config matches snapshot 1`] = ` Object { "defaultValue": 100, "info": - The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move. + Deprecated. This will eventually be phased out by the batched segment sampler. You can enable the batched segment sampler now by setting the dynamic Coordinator config, useBatchedSegmentSampler, to true. Note that if you choose to enable the batched segment sampler, percentOfSegmentsToConsiderPerMove will no longer have any effect on balancing. If useBatchedSegmentSampler == false, this config defines the percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move. , "name": "percentOfSegmentsToConsiderPerMove", "type": "number", diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index ee519ca49c6..f646315a432 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -213,14 +213,19 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ defaultValue: 100, info: ( <> - The percentage of the total number of segments in the cluster that are considered every time - a segment needs to be selected for a move. Druid orders servers by available capacity - ascending (the least available capacity first) and then iterates over the servers. For each - server, Druid iterates over the segments on the server, considering them for moving. The - default config of 100% means that every segment on every server is a candidate to be moved. - This should make sense for most small to medium-sized clusters. However, an admin may find - it preferable to drop this value lower if they don't think that it is worthwhile to - consider every single segment in the cluster each time it is looking for a segment to move. + Deprecated. This will eventually be phased out by the batched segment sampler. You can + enable the batched segment sampler now by setting the dynamic Coordinator config, + useBatchedSegmentSampler, to true. Note that if you choose to enable the batched segment + sampler, percentOfSegmentsToConsiderPerMove will no longer have any effect on balancing. If + useBatchedSegmentSampler == false, this config defines the percentage of the total number of + segments in the cluster that are considered every time a segment needs to be selected for a + move. Druid orders servers by available capacity ascending (the least available capacity + first) and then iterates over the servers. For each server, Druid iterates over the segments + on the server, considering them for moving. The default config of 100% means that every + segment on every server is a candidate to be moved. This should make sense for most small to + medium-sized clusters. However, an admin may find it preferable to drop this value lower if + they don't think that it is worthwhile to consider every single segment in the cluster + each time it is looking for a segment to move. ), },