Add dynamic coordinator config that allows control over how many segments are considered when picking a segment to move. (#10284)

* dynamic coord config adding more balancing control

add new dynamic coordinator config, maxSegmentsToConsiderPerMove. This
config caps the number of segments that are iterated over when selecting
a segment to move. The default value combined with current balancing
strategies will still iterate over all provided segments. However,
setting this value to something > 0 will cap the number of segments
visited. This could make sense in cases where a cluster has a very large
number of segments and the admins prefer less iterations vs a thorough
consideration of all segments provided.

* fix checkstyle failure

* Make doc more detailed for admin to understand when/why to use new config

* refactor PR to use a % of segments instead of raw number

* update the docs

* remove bad doc line

* fix typo in name of new dynamic config

* update RservoirSegmentSampler to gracefully deal with values > 100%

* add handler for <= 0 in ReservoirSegmentSampler

* fixup CoordinatorDynamicConfigTest naming and argument ordering

* fix items in docs after spellcheck flags

* Fix lgtm flag on missing space in string literal

* improve documentation for new config

* Add default value to config docs and add advice in cluster tuning doc

* Add percentOfSegmentsToConsiderPerMove to web console coord config dialog

* update jest snapshot after console change

* fix spell checker errors

* Improve debug logging in getRandomSegmentBalancerHolder to cover all bad inputs for % of segments to consider

* add new config back to web console module after merge with master

* fix ReservoirSegmentSamplerTest

* fix line breaks in coordinator console dialog

* Add a test that helps ensure not regressions for percentOfSegmentsToConsiderPerMove

* Make improvements based off of feedback in review

* additional cleanup coming from review

* Add a warning log if limit on segments to consider for move can't be calcluated

* remove unused import

* fix tests for CoordinatorDynamicConfig

* remove precondition test that is redundant in CoordinatorDynamicConfig Builder class
This commit is contained in:
Lucas Capistrant 2020-12-22 10:27:55 -06:00 committed by GitHub
parent 5bd7924296
commit 58ce2e55d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 450 additions and 33 deletions

View File

@ -746,6 +746,7 @@ A sample Coordinator dynamic config JSON object is shown below:
"mergeBytesLimit": 100000000,
"mergeSegmentsLimit" : 1000,
"maxSegmentsToMove": 5,
"percentOfSegmentsToConsiderPerMove": 100,
"replicantLifetime": 15,
"replicationThrottleLimit": 10,
"emitBalancingStats": false,
@ -764,6 +765,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L|
|`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100|
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|
|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100|
|`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15|
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1|

View File

@ -280,6 +280,23 @@ The heap requirements of the Coordinator scale with the number of servers, segme
You can set the Coordinator heap to the same size as your Broker heap, or slightly smaller: both services have to process cluster-wide state and answer API requests about this state.
#### Dynamic Configuration
`percentOfSegmentsToConsiderPerMove`
* The default value is 100. This means that the Coordinator will consider all segments when it is looking for a segment to move. The Coordinator makes a weighted choice, with segments on Servers with the least capacity being the most likely segments to be moved.
* This weighted selection strategy means that the segments on the servers who have the most available capacity are the least likely to be chosen.
* As the number of segments in the cluster increases, the probability of choosing the Nth segment to move decreases; where N is the last segment considered for moving.
* An admin can use this config to skip consideration of that Nth segment.
* Instead of skipping a precise amount of segments, we skip a percentage of segments in the cluster.
* For example, with the value set to 25, only the first 25% of segments will be considered as a segment that can be moved. This 25% of segments will come from the servers that have the least available capacity.
* In this example, each time the Coordinator looks for a segment to move, it will consider 75% less segments than it did when the configuration was 100. On clusters with hundreds of thousands of segments, this can add up to meaningful coordination time savings.
* General recommendations for this configuration:
* If you are not worried about the amount of time it takes your Coordinator to complete a full coordination cycle, you likely do not need to modify this config.
* If you are frustrated with how long the Coordinator takes to run a full coordination cycle, and you have set the Coordinator dynamic config `maxSegmentsToMove` to a value above 0 (the default is 5), setting this config to a non-default value can help shorten coordination time.
* The recommended starting point value is 66. It represents a meaningful decrease in the percentage of segments considered while also not being too aggressive (You will consider 1/3 fewer segments per move operation with this value).
* The impact that modifying this config will have on your coordination time will be a function of how low you set the config value, the value for `maxSegmentsToMove` and the total number of segments in your cluster.
* If your cluster has a relatively small number of segments, or you choose to move few segments per coordination cycle, there may not be much savings to be had here.
### Overlord
The main performance-related setting on the Overlord is the heap size.

View File

@ -63,11 +63,20 @@ public interface BalancerStrategy
* NOTE: this should really be handled on a per-segment basis, to properly support
* the interval or period-based broadcast rules. For simplicity of the initial
* implementation, only forever broadcast rules are supported.
* @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when
* choosing which segment to move. {@link CoordinatorDynamicConfig} defines a
* config percentOfSegmentsToConsiderPerMove that will be used as an argument
* for implementations of this method.
*
* @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
* there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Nullable
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources);
BalancerSegmentHolder pickSegmentToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
);
/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first

View File

@ -53,6 +53,7 @@ public class CoordinatorDynamicConfig
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int maxSegmentsToMove;
private final double percentOfSegmentsToConsiderPerMove;
private final int replicantLifetime;
private final int replicationThrottleLimit;
private final int balancerComputeThreads;
@ -95,6 +96,7 @@ public class CoordinatorDynamicConfig
@JsonProperty("mergeBytesLimit") long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") double percentOfSegmentsToConsiderPerMove,
@JsonProperty("replicantLifetime") int replicantLifetime,
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
@ -123,6 +125,13 @@ public class CoordinatorDynamicConfig
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
Preconditions.checkArgument(
percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100,
"percentOfSegmentsToConsiderPerMove should be between 1 and 100!"
);
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
@ -211,6 +220,12 @@ public class CoordinatorDynamicConfig
return maxSegmentsToMove;
}
@JsonProperty
public double getPercentOfSegmentsToConsiderPerMove()
{
return percentOfSegmentsToConsiderPerMove;
}
@JsonProperty
public int getReplicantLifetime()
{
@ -302,6 +317,7 @@ public class CoordinatorDynamicConfig
", mergeBytesLimit=" + mergeBytesLimit +
", mergeSegmentsLimit=" + mergeSegmentsLimit +
", maxSegmentsToMove=" + maxSegmentsToMove +
", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove +
", replicantLifetime=" + replicantLifetime +
", replicationThrottleLimit=" + replicationThrottleLimit +
", balancerComputeThreads=" + balancerComputeThreads +
@ -341,6 +357,9 @@ public class CoordinatorDynamicConfig
if (maxSegmentsToMove != that.maxSegmentsToMove) {
return false;
}
if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) {
return false;
}
if (replicantLifetime != that.replicantLifetime) {
return false;
}
@ -382,6 +401,7 @@ public class CoordinatorDynamicConfig
mergeBytesLimit,
mergeSegmentsLimit,
maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove,
replicantLifetime,
replicationThrottleLimit,
balancerComputeThreads,
@ -408,6 +428,7 @@ public class CoordinatorDynamicConfig
private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L;
private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100;
private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5;
private static final int DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100;
private static final int DEFAULT_REPLICANT_LIFETIME = 15;
private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
@ -421,6 +442,7 @@ public class CoordinatorDynamicConfig
private Long mergeBytesLimit;
private Integer mergeSegmentsLimit;
private Integer maxSegmentsToMove;
private Double percentOfSegmentsToConsiderPerMove;
private Integer replicantLifetime;
private Integer replicationThrottleLimit;
private Boolean emitBalancingStats;
@ -444,6 +466,7 @@ public class CoordinatorDynamicConfig
@JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
@JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
@ -463,6 +486,7 @@ public class CoordinatorDynamicConfig
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = balancerComputeThreads;
@ -500,6 +524,12 @@ public class CoordinatorDynamicConfig
return this;
}
public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove)
{
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
return this;
}
public Builder withReplicantLifetime(int replicantLifetime)
{
this.replicantLifetime = replicantLifetime;
@ -569,6 +599,8 @@ public class CoordinatorDynamicConfig
mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit,
mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit,
maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
: percentOfSegmentsToConsiderPerMove,
replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime,
replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads,
@ -598,6 +630,7 @@ public class CoordinatorDynamicConfig
mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit,
mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit,
maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove,
replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime,
replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads,

View File

@ -214,10 +214,15 @@ public class CostBalancerStrategy implements BalancerStrategy
@Override
public BalancerSegmentHolder pickSegmentToMove(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(
serverHolders,
broadcastDatasources,
percentOfSegmentsToConsider
);
}
@Override

View File

@ -54,9 +54,17 @@ public class RandomBalancerStrategy implements BalancerStrategy
}
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
public BalancerSegmentHolder pickSegmentToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(
serverHolders,
broadcastDatasources,
percentOfSegmentsToConsider
);
}
@Override

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
@ -28,15 +29,54 @@ import java.util.concurrent.ThreadLocalRandom;
final class ReservoirSegmentSampler
{
private static final EmittingLogger log = new EmittingLogger(ReservoirSegmentSampler.class);
/**
* Iterates over segments that live on the candidate servers passed in {@link ServerHolder} and (possibly) picks a
* segment to return to caller in a {@link BalancerSegmentHolder} object.
*
* @param serverHolders List of {@link ServerHolder} objects containing segments who are candidates to be chosen.
* @param broadcastDatasources Set of DataSource names that identify broadcast datasources. We don't want to consider
* segments from these datasources.
* @param percentOfSegmentsToConsider The % of total cluster segments to consider before short-circuiting and
* returning immediately.
* @return
*/
static BalancerSegmentHolder getRandomBalancerSegmentHolder(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int calculatedSegmentLimit = Integer.MAX_VALUE;
int numSoFar = 0;
// Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration less than or equal to
// 0% of segments or greater than 100% of segments.
if (percentOfSegmentsToConsider <= 0 || percentOfSegmentsToConsider > 100) {
log.warn("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed."
+ " You Provided [%f]", percentOfSegmentsToConsider);
percentOfSegmentsToConsider = 100;
}
// Calculate the integer limit for the number of segments to be considered for moving if % is less than 100
if (percentOfSegmentsToConsider < 100) {
int totalSegments = 0;
for (ServerHolder server : serverHolders) {
totalSegments += server.getServer().getNumSegments();
}
// If totalSegments are zero, we will assume it is a mistake and move on to iteration without updating
// calculatedSegmentLimit
if (totalSegments != 0) {
calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * (percentOfSegmentsToConsider / 100.0));
} else {
log.warn("Unable to calculate limit on segments to consider because ServerHolder collection indicates"
+ " zero segments existing in the cluster.");
}
}
for (ServerHolder server : serverHolders) {
if (!server.getServer().getType().isSegmentReplicationTarget()) {
// if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do
@ -56,6 +96,19 @@ final class ReservoirSegmentSampler
proposalSegment = segment;
}
numSoFar++;
// We have iterated over the alloted number of segments and will return the currently proposed segment or null
// We will only break out early if we are iterating less than 100% of the total cluster segments
if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) {
log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%f percent] of"
+ " segments to consider to move. Segments Iterated: [%d]", percentOfSegmentsToConsider, numSoFar);
break;
}
}
// We have iterated over the alloted number of segments and will return the currently proposed segment or null
// We will only break out early if we are iterating less than 100% of the total cluster segments
if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) {
break;
}
}
if (fromServerHolder != null) {

View File

@ -189,7 +189,8 @@ public class BalanceSegments implements CoordinatorDuty
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(
toMoveFrom,
params.getBroadcastDatasources()
params.getBroadcastDatasources(),
params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove()
);
if (segmentToMoveHolder == null) {
log.info("All servers to move segments from are empty, ending run.");

View File

@ -232,10 +232,19 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources))
.andReturn(new BalancerSegmentHolder(druidServer2, segment3))
.andReturn(new BalancerSegmentHolder(druidServer2, segment4));
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
EasyMock.expect(
strategy.pickSegmentToMove(
ImmutableList.of(
new ServerHolder(druidServer2, peon2, false)
),
broadcastDatasources,
100
)
).andReturn(
new BalancerSegmentHolder(druidServer2, segment3)).andReturn(new BalancerSegmentHolder(druidServer2, segment4)
);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.andReturn(new BalancerSegmentHolder(druidServer1, segment2));
@ -300,7 +309,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.andReturn(new BalancerSegmentHolder(druidServer1, segment2))
.andReturn(new BalancerSegmentHolder(druidServer2, segment3))
@ -349,7 +358,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.anyTimes();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> {
@ -384,7 +393,7 @@ public class BalanceSegmentsTest
ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.once();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
@ -527,6 +536,76 @@ public class BalanceSegmentsTest
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
}
/**
* Testing that the dynamic coordinator config value, percentOfSegmentsToConsiderPerMove, is honored when calling
* out to pickSegmentToMove. This config limits the number of segments that are considered when looking for a segment
* to move.
*/
@Test
public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove()
{
mockDruidServer(druidServer1, "1", "normal", 50L, 100L, Arrays.asList(segment1, segment2));
mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4));
mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList());
EasyMock.replay(druidServer4);
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
// The first call for decommissioning servers
EasyMock.expect(
strategy.pickSegmentToMove(
ImmutableList.of(),
broadcastDatasources,
40
)
)
.andReturn(null);
// The second call for the single non decommissioning server move
EasyMock.expect(
strategy.pickSegmentToMove(
ImmutableList.of(
new ServerHolder(druidServer3, peon3, false),
new ServerHolder(druidServer2, peon2, false),
new ServerHolder(druidServer1, peon1, false)
),
broadcastDatasources,
40
)
)
.andReturn(new BalancerSegmentHolder(druidServer2, segment3));
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
.anyTimes();
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2, druidServer3),
ImmutableList.of(peon1, peon2, peon3),
ImmutableList.of(false, false, false)
)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withMaxSegmentsToMove(1)
.withPercentOfSegmentsToConsiderPerMove(40)
.build()
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertThat(
peon3.getSegmentsToLoad(),
Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3)))
);
}
private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons
@ -637,7 +716,11 @@ public class BalanceSegmentsTest
}
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
public BalancerSegmentHolder pickSegmentToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size());
}
@ -661,9 +744,18 @@ public class BalanceSegmentsTest
// either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3])
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources))
.andReturn(new BalancerSegmentHolder(druidServer2, segment2));
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
EasyMock.expect(
strategy.pickSegmentToMove(
ImmutableList.of(
new ServerHolder(druidServer2, peon2, true)
),
broadcastDatasources,
100
)
).andReturn(
new BalancerSegmentHolder(druidServer2, segment2)
);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1));
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))

View File

@ -174,14 +174,74 @@ public class ReservoirSegmentSamplerTest
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
for (int i = 0; i < iterations; i++) {
// due to the pseudo-randomness of this method, we may not select a segment every single time no matter what.
BalancerSegmentHolder balancerSegmentHolder = ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet());
BalancerSegmentHolder balancerSegmentHolder = ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 100);
if (balancerSegmentHolder != null) {
segmentCountMap.put(balancerSegmentHolder.getSegment(), 1);
}
}
for (DataSegment segment : segments) {
Assert.assertEquals(segmentCountMap.get(segment), new Integer(1));
Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
}
EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);
EasyMock.verify(holder1, holder2, holder3, holder4);
}
/**
* Makes sure that the segment on server4 is never chosen in 5k iterations because it should never have its segment
* checked due to the limit on segment candidates
*/
@Test
public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit()
{
int iterations = 5000;
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer1, segments1);
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer2, segments2);
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations);
ImmutableDruidServerTests.expectSegments(druidServer3, segments3);
EasyMock.replay(druidServer3);
ImmutableDruidServerTests.expectSegments(druidServer4, segments4);
EasyMock.replay(druidServer4);
// Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic.
EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes();
EasyMock.replay(holder1);
EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes();
EasyMock.replay(holder2);
EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes();
EasyMock.replay(holder3);
// We only run getServer() each time we calculate the limit on segments to consider. Always 5k
EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000);
EasyMock.replay(holder4);
List<ServerHolder> holderList = new ArrayList<>();
holderList.add(holder1);
holderList.add(holder2);
holderList.add(holder3);
holderList.add(holder4);
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
for (int i = 0; i < iterations; i++) {
segmentCountMap.put(
ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 75).getSegment(), 1
);
}
for (DataSegment segment : segments) {
if (!segment.equals(segment4)) {
Assert.assertEquals(new Integer(1), segmentCountMap.get(segment));
} else {
Assert.assertNull(segmentCountMap.get(segment));
}
}
EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4);

View File

@ -44,6 +44,7 @@ public class CoordinatorDynamicConfigTest
+ " \"mergeBytesLimit\": 1,\n"
+ " \"mergeSegmentsLimit\" : 1,\n"
+ " \"maxSegmentsToMove\": 1,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
+ " \"replicantLifetime\": 1,\n"
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
@ -66,16 +67,19 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true);
actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true);
}
@Test
@ -86,6 +90,7 @@ public class CoordinatorDynamicConfigTest
+ " \"mergeBytesLimit\": 1,\n"
+ " \"mergeSegmentsLimit\" : 1,\n"
+ " \"maxSegmentsToMove\": 1,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
+ " \"replicantLifetime\": 1,\n"
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
@ -105,13 +110,13 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of();
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false);
actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false);
}
@Test
@ -122,6 +127,7 @@ public class CoordinatorDynamicConfigTest
+ " \"mergeBytesLimit\": 1,\n"
+ " \"mergeSegmentsLimit\" : 1,\n"
+ " \"maxSegmentsToMove\": 1,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
+ " \"replicantLifetime\": 1,\n"
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
@ -139,7 +145,91 @@ public class CoordinatorDynamicConfigTest
),
CoordinatorDynamicConfig.class
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0, false);
assertConfig(
actual,
1,
1,
1,
1,
1,
1,
1,
2,
true,
ImmutableSet.of("test1", "test2"),
false,
1,
ImmutableSet.of(),
0,
false
);
}
@Test
public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() throws Exception
{
try {
String jsonStr = "{\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 0\n"
+ "}\n";
mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
)
),
CoordinatorDynamicConfig.class
);
Assert.fail("deserialization should fail.");
}
catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
}
try {
String jsonStr = "{\n"
+ " \"percentOfSegmentsToConsiderPerMove\": -100\n"
+ "}\n";
mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
)
),
CoordinatorDynamicConfig.class
);
Assert.fail("deserialization should fail.");
}
catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
}
try {
String jsonStr = "{\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 105\n"
+ "}\n";
mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
CoordinatorDynamicConfig.class
)
),
CoordinatorDynamicConfig.class
);
Assert.fail("deserialization should fail.");
}
catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
}
}
@Test
@ -150,6 +240,7 @@ public class CoordinatorDynamicConfigTest
+ " \"mergeBytesLimit\": 1,\n"
+ " \"mergeSegmentsLimit\" : 1,\n"
+ " \"maxSegmentsToMove\": 1,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
+ " \"replicantLifetime\": 1,\n"
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
@ -168,13 +259,14 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false);
//ensure whitelist is empty when killAllDataSources is true
try {
jsonStr = "{\n"
+ " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
+ " \"killAllDataSources\": true\n"
+ " \"killAllDataSources\": true,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1\n"
+ "}\n";
mapper.readValue(
jsonStr,
@ -196,6 +288,7 @@ public class CoordinatorDynamicConfigTest
+ " \"mergeBytesLimit\": 1,\n"
+ " \"mergeSegmentsLimit\" : 1,\n"
+ " \"maxSegmentsToMove\": 1,\n"
+ " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
+ " \"replicantLifetime\": 1,\n"
+ " \"replicationThrottleLimit\": 1,\n"
+ " \"balancerComputeThreads\": 2, \n"
@ -213,7 +306,7 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false);
assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false);
}
@Test
@ -221,7 +314,24 @@ public class CoordinatorDynamicConfigTest
{
CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build();
ImmutableSet<String> emptyList = ImmutableSet.of();
assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70, false);
assertConfig(
defaultConfig,
900000,
524288000,
100,
5,
100,
15,
10,
1,
false,
emptyList,
false,
0,
emptyList,
70,
false
);
}
@Test
@ -235,7 +345,7 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(
current,
new CoordinatorDynamicConfig
.Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)
.Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)
.build(current)
);
}
@ -255,6 +365,7 @@ public class CoordinatorDynamicConfigTest
long expectedMergeBytesLimit,
int expectedMergeSegmentsLimit,
int expectedMaxSegmentsToMove,
int expectedPercentOfSegmentsToConsiderPerMove,
int expectedReplicantLifetime,
int expectedReplicationThrottleLimit,
int expectedBalancerComputeThreads,
@ -274,6 +385,7 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit());
Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit());
Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove());
Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0);
Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime());
Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit());
Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads());

View File

@ -167,6 +167,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = `
"name": "decommissioningMaxPercentOfMaxSegmentsToMove",
"type": "number",
},
Object {
"defaultValue": 100,
"info": <React.Fragment>
The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.
</React.Fragment>,
"name": "percentOfSegmentsToConsiderPerMove",
"type": "number",
},
Object {
"defaultValue": false,
"info": <React.Fragment>

View File

@ -193,6 +193,23 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field<CoordinatorDynamicConfig>[
</>
),
},
{
name: 'percentOfSegmentsToConsiderPerMove',
type: 'number',
defaultValue: 100,
info: (
<>
The percentage of the total number of segments in the cluster that are considered every time
a segment needs to be selected for a move. Druid orders servers by available capacity
ascending (the least available capacity first) and then iterates over the servers. For each
server, Druid iterates over the segments on the server, considering them for moving. The
default config of 100% means that every segment on every server is a candidate to be moved.
This should make sense for most small to medium-sized clusters. However, an admin may find
it preferable to drop this value lower if they don't think that it is worthwhile to consider
every single segment in the cluster each time it is looking for a segment to move.
</>
),
},
{
name: 'pauseCoordination',
type: 'boolean',