diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6f18386c1c5..657b652b466 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -746,6 +746,7 @@ A sample Coordinator dynamic config JSON object is shown below: "mergeBytesLimit": 100000000, "mergeSegmentsLimit" : 1000, "maxSegmentsToMove": 5, + "percentOfSegmentsToConsiderPerMove": 100, "replicantLifetime": 15, "replicationThrottleLimit": 10, "emitBalancingStats": false, @@ -764,6 +765,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| +|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| diff --git a/docs/operations/basic-cluster-tuning.md b/docs/operations/basic-cluster-tuning.md index c3413eca1b9..1f7253c23f2 100644 --- a/docs/operations/basic-cluster-tuning.md +++ b/docs/operations/basic-cluster-tuning.md @@ -280,6 +280,23 @@ The heap requirements of the Coordinator scale with the number of servers, segme You can set the Coordinator heap to the same size as your Broker heap, or slightly smaller: both services have to process cluster-wide state and answer API requests about this state. +#### Dynamic Configuration + +`percentOfSegmentsToConsiderPerMove` +* The default value is 100. This means that the Coordinator will consider all segments when it is looking for a segment to move. The Coordinator makes a weighted choice, with segments on Servers with the least capacity being the most likely segments to be moved. + * This weighted selection strategy means that the segments on the servers who have the most available capacity are the least likely to be chosen. + * As the number of segments in the cluster increases, the probability of choosing the Nth segment to move decreases; where N is the last segment considered for moving. + * An admin can use this config to skip consideration of that Nth segment. +* Instead of skipping a precise amount of segments, we skip a percentage of segments in the cluster. + * For example, with the value set to 25, only the first 25% of segments will be considered as a segment that can be moved. This 25% of segments will come from the servers that have the least available capacity. + * In this example, each time the Coordinator looks for a segment to move, it will consider 75% less segments than it did when the configuration was 100. On clusters with hundreds of thousands of segments, this can add up to meaningful coordination time savings. +* General recommendations for this configuration: + * If you are not worried about the amount of time it takes your Coordinator to complete a full coordination cycle, you likely do not need to modify this config. + * If you are frustrated with how long the Coordinator takes to run a full coordination cycle, and you have set the Coordinator dynamic config `maxSegmentsToMove` to a value above 0 (the default is 5), setting this config to a non-default value can help shorten coordination time. + * The recommended starting point value is 66. It represents a meaningful decrease in the percentage of segments considered while also not being too aggressive (You will consider 1/3 fewer segments per move operation with this value). +* The impact that modifying this config will have on your coordination time will be a function of how low you set the config value, the value for `maxSegmentsToMove` and the total number of segments in your cluster. + * If your cluster has a relatively small number of segments, or you choose to move few segments per coordination cycle, there may not be much savings to be had here. + ### Overlord The main performance-related setting on the Overlord is the heap size. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 889c167c8ef..db451693674 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -63,11 +63,20 @@ public interface BalancerStrategy * NOTE: this should really be handled on a per-segment basis, to properly support * the interval or period-based broadcast rules. For simplicity of the initial * implementation, only forever broadcast rules are supported. + * @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when + * choosing which segment to move. {@link CoordinatorDynamicConfig} defines a + * config percentOfSegmentsToConsiderPerMove that will be used as an argument + * for implementations of this method. + * * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if * there are no segments to pick from (i. e. all provided serverHolders are empty). */ @Nullable - BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources); + BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + double percentOfSegmentsToConsider + ); /** * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index bafeb73c508..4415f6a8ce9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -53,6 +53,7 @@ public class CoordinatorDynamicConfig private final long mergeBytesLimit; private final int mergeSegmentsLimit; private final int maxSegmentsToMove; + private final double percentOfSegmentsToConsiderPerMove; private final int replicantLifetime; private final int replicationThrottleLimit; private final int balancerComputeThreads; @@ -95,6 +96,7 @@ public class CoordinatorDynamicConfig @JsonProperty("mergeBytesLimit") long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") double percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @@ -123,6 +125,13 @@ public class CoordinatorDynamicConfig this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + + Preconditions.checkArgument( + percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, + "percentOfSegmentsToConsiderPerMove should be between 1 and 100!" + ); + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; + this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); @@ -211,6 +220,12 @@ public class CoordinatorDynamicConfig return maxSegmentsToMove; } + @JsonProperty + public double getPercentOfSegmentsToConsiderPerMove() + { + return percentOfSegmentsToConsiderPerMove; + } + @JsonProperty public int getReplicantLifetime() { @@ -302,6 +317,7 @@ public class CoordinatorDynamicConfig ", mergeBytesLimit=" + mergeBytesLimit + ", mergeSegmentsLimit=" + mergeSegmentsLimit + ", maxSegmentsToMove=" + maxSegmentsToMove + + ", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove + ", replicantLifetime=" + replicantLifetime + ", replicationThrottleLimit=" + replicationThrottleLimit + ", balancerComputeThreads=" + balancerComputeThreads + @@ -341,6 +357,9 @@ public class CoordinatorDynamicConfig if (maxSegmentsToMove != that.maxSegmentsToMove) { return false; } + if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) { + return false; + } if (replicantLifetime != that.replicantLifetime) { return false; } @@ -382,6 +401,7 @@ public class CoordinatorDynamicConfig mergeBytesLimit, mergeSegmentsLimit, maxSegmentsToMove, + percentOfSegmentsToConsiderPerMove, replicantLifetime, replicationThrottleLimit, balancerComputeThreads, @@ -408,6 +428,7 @@ public class CoordinatorDynamicConfig private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L; private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100; private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5; + private static final int DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100; private static final int DEFAULT_REPLICANT_LIFETIME = 15; private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10; private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; @@ -421,6 +442,7 @@ public class CoordinatorDynamicConfig private Long mergeBytesLimit; private Integer mergeSegmentsLimit; private Integer maxSegmentsToMove; + private Double percentOfSegmentsToConsiderPerMove; private Integer replicantLifetime; private Integer replicationThrottleLimit; private Boolean emitBalancingStats; @@ -444,6 +466,7 @@ public class CoordinatorDynamicConfig @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @@ -463,6 +486,7 @@ public class CoordinatorDynamicConfig this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = balancerComputeThreads; @@ -500,6 +524,12 @@ public class CoordinatorDynamicConfig return this; } + public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove) + { + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; + return this; + } + public Builder withReplicantLifetime(int replicantLifetime) { this.replicantLifetime = replicantLifetime; @@ -569,6 +599,8 @@ public class CoordinatorDynamicConfig mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit, mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit, maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove, + percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE + : percentOfSegmentsToConsiderPerMove, replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime, replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit, balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, @@ -598,6 +630,7 @@ public class CoordinatorDynamicConfig mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit, mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit, maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove, + percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove, replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime, replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit, balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index e5e3cb57b12..ac56544b073 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -214,10 +214,15 @@ public class CostBalancerStrategy implements BalancerStrategy @Override public BalancerSegmentHolder pickSegmentToMove( final List serverHolders, - Set broadcastDatasources + Set broadcastDatasources, + double percentOfSegmentsToConsider ) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( + serverHolders, + broadcastDatasources, + percentOfSegmentsToConsider + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index de3e46e66f4..8f3b96d67ab 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -54,9 +54,17 @@ public class RandomBalancerStrategy implements BalancerStrategy } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) + public BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + double percentOfSegmentsToConsider + ) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( + serverHolders, + broadcastDatasources, + percentOfSegmentsToConsider + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index 7181d52e152..dd43760ec87 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; import java.util.List; @@ -28,15 +29,54 @@ import java.util.concurrent.ThreadLocalRandom; final class ReservoirSegmentSampler { + private static final EmittingLogger log = new EmittingLogger(ReservoirSegmentSampler.class); + + /** + * Iterates over segments that live on the candidate servers passed in {@link ServerHolder} and (possibly) picks a + * segment to return to caller in a {@link BalancerSegmentHolder} object. + * + * @param serverHolders List of {@link ServerHolder} objects containing segments who are candidates to be chosen. + * @param broadcastDatasources Set of DataSource names that identify broadcast datasources. We don't want to consider + * segments from these datasources. + * @param percentOfSegmentsToConsider The % of total cluster segments to consider before short-circuiting and + * returning immediately. + * @return + */ static BalancerSegmentHolder getRandomBalancerSegmentHolder( final List serverHolders, - Set broadcastDatasources + Set broadcastDatasources, + double percentOfSegmentsToConsider ) { ServerHolder fromServerHolder = null; DataSegment proposalSegment = null; + int calculatedSegmentLimit = Integer.MAX_VALUE; int numSoFar = 0; + // Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration less than or equal to + // 0% of segments or greater than 100% of segments. + if (percentOfSegmentsToConsider <= 0 || percentOfSegmentsToConsider > 100) { + log.warn("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed." + + " You Provided [%f]", percentOfSegmentsToConsider); + percentOfSegmentsToConsider = 100; + } + + // Calculate the integer limit for the number of segments to be considered for moving if % is less than 100 + if (percentOfSegmentsToConsider < 100) { + int totalSegments = 0; + for (ServerHolder server : serverHolders) { + totalSegments += server.getServer().getNumSegments(); + } + // If totalSegments are zero, we will assume it is a mistake and move on to iteration without updating + // calculatedSegmentLimit + if (totalSegments != 0) { + calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * (percentOfSegmentsToConsider / 100.0)); + } else { + log.warn("Unable to calculate limit on segments to consider because ServerHolder collection indicates" + + " zero segments existing in the cluster."); + } + } + for (ServerHolder server : serverHolders) { if (!server.getServer().getType().isSegmentReplicationTarget()) { // if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do @@ -56,6 +96,19 @@ final class ReservoirSegmentSampler proposalSegment = segment; } numSoFar++; + + // We have iterated over the alloted number of segments and will return the currently proposed segment or null + // We will only break out early if we are iterating less than 100% of the total cluster segments + if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) { + log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%f percent] of" + + " segments to consider to move. Segments Iterated: [%d]", percentOfSegmentsToConsider, numSoFar); + break; + } + } + // We have iterated over the alloted number of segments and will return the currently proposed segment or null + // We will only break out early if we are iterating less than 100% of the total cluster segments + if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) { + break; } } if (fromServerHolder != null) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index a1c5237ddd1..d1fca19e03b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -189,7 +189,8 @@ public class BalanceSegments implements CoordinatorDuty for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove( toMoveFrom, - params.getBroadcastDatasources() + params.getBroadcastDatasources(), + params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove() ); if (segmentToMoveHolder == null) { log.info("All servers to move segments from are empty, ending run."); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index f37c92cb105..26175f61321 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -232,10 +232,19 @@ public class BalanceSegmentsTest mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment4)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer2, peon2, false) + ), + broadcastDatasources, + 100 + ) + ).andReturn( + new BalancerSegmentHolder(druidServer2, segment3)).andReturn(new BalancerSegmentHolder(druidServer2, segment4) + ); + + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)); @@ -300,7 +309,7 @@ public class BalanceSegmentsTest mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)) .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) @@ -349,7 +358,7 @@ public class BalanceSegmentsTest mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { @@ -384,7 +393,7 @@ public class BalanceSegmentsTest ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) @@ -527,6 +536,76 @@ public class BalanceSegmentsTest Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); } + /** + * Testing that the dynamic coordinator config value, percentOfSegmentsToConsiderPerMove, is honored when calling + * out to pickSegmentToMove. This config limits the number of segments that are considered when looking for a segment + * to move. + */ + @Test + public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() + { + mockDruidServer(druidServer1, "1", "normal", 50L, 100L, Arrays.asList(segment1, segment2)); + mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4)); + mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList()); + + EasyMock.replay(druidServer4); + + mockCoordinator(coordinator); + + BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); + + // The first call for decommissioning servers + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of(), + broadcastDatasources, + 40 + ) + ) + .andReturn(null); + + // The second call for the single non decommissioning server move + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer3, peon3, false), + new ServerHolder(druidServer2, peon2, false), + new ServerHolder(druidServer1, peon1, false) + ), + broadcastDatasources, + 40 + ) + ) + .andReturn(new BalancerSegmentHolder(druidServer2, segment3)); + + EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(new ServerHolder(druidServer3, peon3)) + .anyTimes(); + EasyMock.replay(strategy); + + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2, druidServer3), + ImmutableList.of(peon1, peon2, peon3), + ImmutableList.of(false, false, false) + ) + .withDynamicConfigs( + CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(1) + .withPercentOfSegmentsToConsiderPerMove(40) + .build() + ) + .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) + .build(); + + params = new BalanceSegmentsTester(coordinator).run(params); + Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + Assert.assertThat( + peon3.getSegmentsToLoad(), + Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3))) + ); + } + private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( List druidServers, List peons @@ -637,7 +716,11 @@ public class BalanceSegmentsTest } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) + public BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + double percentOfSegmentsToConsider + ) { return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); } @@ -661,9 +744,18 @@ public class BalanceSegmentsTest // either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3]) BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment2)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer2, peon2, true) + ), + broadcastDatasources, + 100 + ) + ).andReturn( + new BalancerSegmentHolder(druidServer2, segment2) + ); + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index 8f84ad6e295..f59f2427403 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -174,14 +174,74 @@ public class ReservoirSegmentSamplerTest Map segmentCountMap = new HashMap<>(); for (int i = 0; i < iterations; i++) { // due to the pseudo-randomness of this method, we may not select a segment every single time no matter what. - BalancerSegmentHolder balancerSegmentHolder = ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet()); + BalancerSegmentHolder balancerSegmentHolder = ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 100); if (balancerSegmentHolder != null) { segmentCountMap.put(balancerSegmentHolder.getSegment(), 1); } } for (DataSegment segment : segments) { - Assert.assertEquals(segmentCountMap.get(segment), new Integer(1)); + Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); + } + + EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4); + EasyMock.verify(holder1, holder2, holder3, holder4); + } + + /** + * Makes sure that the segment on server4 is never chosen in 5k iterations because it should never have its segment + * checked due to the limit on segment candidates + */ + @Test + public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() + { + int iterations = 5000; + + EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations); + ImmutableDruidServerTests.expectSegments(druidServer1, segments1); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations); + ImmutableDruidServerTests.expectSegments(druidServer2, segments2); + EasyMock.replay(druidServer2); + + EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations); + ImmutableDruidServerTests.expectSegments(druidServer3, segments3); + EasyMock.replay(druidServer3); + + ImmutableDruidServerTests.expectSegments(druidServer4, segments4); + EasyMock.replay(druidServer4); + + // Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic. + EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); + EasyMock.replay(holder1); + EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); + EasyMock.replay(holder2); + EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes(); + EasyMock.replay(holder3); + // We only run getServer() each time we calculate the limit on segments to consider. Always 5k + EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000); + EasyMock.replay(holder4); + + List holderList = new ArrayList<>(); + holderList.add(holder1); + holderList.add(holder2); + holderList.add(holder3); + holderList.add(holder4); + + Map segmentCountMap = new HashMap<>(); + for (int i = 0; i < iterations; i++) { + segmentCountMap.put( + ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 75).getSegment(), 1 + ); + } + + for (DataSegment segment : segments) { + if (!segment.equals(segment4)) { + Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); + } else { + Assert.assertNull(segmentCountMap.get(segment)); + } } EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 0050c0e0739..15c49a2de1a 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -44,6 +44,7 @@ public class CoordinatorDynamicConfigTest + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -66,16 +67,19 @@ public class CoordinatorDynamicConfigTest ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); + + actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); } @Test @@ -86,6 +90,7 @@ public class CoordinatorDynamicConfigTest + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -105,13 +110,13 @@ public class CoordinatorDynamicConfigTest ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); } @Test @@ -122,6 +127,7 @@ public class CoordinatorDynamicConfigTest + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -139,7 +145,91 @@ public class CoordinatorDynamicConfigTest ), CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0, false); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + ImmutableSet.of("test1", "test2"), + false, + 1, + ImmutableSet.of(), + 0, + false + ); + } + + @Test + public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() throws Exception + { + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 0\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": -100\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 105\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } } @Test @@ -150,6 +240,7 @@ public class CoordinatorDynamicConfigTest + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -168,13 +259,14 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); //ensure whitelist is empty when killAllDataSources is true try { jsonStr = "{\n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"killAllDataSources\": true\n" + + " \"killAllDataSources\": true,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1\n" + "}\n"; mapper.readValue( jsonStr, @@ -196,6 +288,7 @@ public class CoordinatorDynamicConfigTest + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -213,7 +306,7 @@ public class CoordinatorDynamicConfigTest CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); } @Test @@ -221,7 +314,24 @@ public class CoordinatorDynamicConfigTest { CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build(); ImmutableSet emptyList = ImmutableSet.of(); - assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70, false); + assertConfig( + defaultConfig, + 900000, + 524288000, + 100, + 5, + 100, + 15, + 10, + 1, + false, + emptyList, + false, + 0, + emptyList, + 70, + false + ); } @Test @@ -235,7 +345,7 @@ public class CoordinatorDynamicConfigTest Assert.assertEquals( current, new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) + .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(current) ); } @@ -255,6 +365,7 @@ public class CoordinatorDynamicConfigTest long expectedMergeBytesLimit, int expectedMergeSegmentsLimit, int expectedMaxSegmentsToMove, + int expectedPercentOfSegmentsToConsiderPerMove, int expectedReplicantLifetime, int expectedReplicationThrottleLimit, int expectedBalancerComputeThreads, @@ -274,6 +385,7 @@ public class CoordinatorDynamicConfigTest Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); + Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0); Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 50f17d44ee6..ff3008c6114 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -167,6 +167,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = ` "name": "decommissioningMaxPercentOfMaxSegmentsToMove", "type": "number", }, + Object { + "defaultValue": 100, + "info": + The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move. + , + "name": "percentOfSegmentsToConsiderPerMove", + "type": "number", + }, Object { "defaultValue": false, "info": diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index b5621ce31f8..f2fa7d0c8ec 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -193,6 +193,23 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, + { + name: 'percentOfSegmentsToConsiderPerMove', + type: 'number', + defaultValue: 100, + info: ( + <> + The percentage of the total number of segments in the cluster that are considered every time + a segment needs to be selected for a move. Druid orders servers by available capacity + ascending (the least available capacity first) and then iterates over the servers. For each + server, Druid iterates over the segments on the server, considering them for moving. The + default config of 100% means that every segment on every server is a candidate to be moved. + This should make sense for most small to medium-sized clusters. However, an admin may find + it preferable to drop this value lower if they don't think that it is worthwhile to consider + every single segment in the cluster each time it is looking for a segment to move. + + ), + }, { name: 'pauseCoordination', type: 'boolean',