Adapt maxSegmentsToMove based on cluster skew (#14584)

Changes:
- No change in behaviour if `smartSegmentLoading` is disabled
- If `smartSegmentLoading` is enabled
  - Compute `balancerComputeThreads` based on `numUsedSegments`
  - Compute `maxSegmentsToMove` based on `balancerComputeThreads`
  - Compute `segmentsToMoveToFixSkew` based on usage skew
  - Compute `segmentsToMove = Math.min(maxSegmentsToMove, segmentsToMoveToFixSkew)`

Limits:
- 1 <= `balancerComputeThreads` <= 8
- `maxSegmentsToMove` <= 20% of total segments
- `minSegmentsToMove` = 0.15% of total segments
This commit is contained in:
Kashif Faraz 2023-08-17 11:14:54 +05:30 committed by GitHub
parent cb27d0d2ed
commit 5d4ac64178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 940 additions and 164 deletions

View File

@ -542,26 +542,37 @@ public class DruidCoordinator
}
}
/**
* Resets the balancerExec if required and creates a new BalancerStrategy for
* the current coordinator run.
*/
@VisibleForTesting
protected void initBalancerExecutor()
BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
{
final int currentNumber = getDynamicConfigs().getBalancerComputeThreads();
// Reset balancerExecutor if required
if (balancerExec == null) {
balancerExec = createNewBalancerExecutor(currentNumber);
} else if (cachedBalancerThreadNumber != currentNumber) {
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
} else if (cachedBalancerThreadNumber != balancerComputeThreads) {
log.info(
"balancerComputeThreads has changed from [%d] to [%d], recreating the thread pool.",
cachedBalancerThreadNumber,
currentNumber
"'balancerComputeThreads' has changed from [%d] to [%d]",
cachedBalancerThreadNumber, balancerComputeThreads
);
balancerExec.shutdownNow();
balancerExec = createNewBalancerExecutor(currentNumber);
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
}
// Create BalancerStrategy
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy[%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
);
return balancerStrategy;
}
private ListeningExecutorService createNewBalancerExecutor(int numThreads)
{
log.info("Creating new balancer executor with [%d] threads.", numThreads);
cachedBalancerThreadNumber = numThreads;
return MoreExecutors.listeningDecorator(
Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
@ -576,7 +587,7 @@ public class DruidCoordinator
new UpdateReplicationStatus(),
new UnloadUnusedSegments(loadQueueManager),
new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused),
new BalanceSegments(),
new BalanceSegments(config.getCoordinatorPeriod()),
new CollectSegmentAndServerStats(DruidCoordinator.this)
);
}
@ -750,7 +761,7 @@ public class DruidCoordinator
);
log.info(
"Emitted [%d] stats for group [%s]. All collected stats:%s\n",
"Emitted [%d] stats for group [%s]. All collected stats:%s",
emittedCount.get(), dutyGroupName, allStats.buildStatsTable()
);
}
@ -758,7 +769,7 @@ public class DruidCoordinator
// Emit the runtime of the full DutiesRunnable
final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms", dutyGroupName, runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms.%n", dutyGroupName, runMillis);
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
@ -818,15 +829,8 @@ public class DruidCoordinator
final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);
initBalancerExecutor();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy [%s] with round-robin assignment [%s] and debug dimensions [%s].",
balancerStrategy.getClass().getSimpleName(),
segmentLoadingConfig.isUseRoundRobinSegmentAssignment(),
dynamicConfig.getDebugDimensions()
);
final BalancerStrategy balancerStrategy
= createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)

View File

@ -33,18 +33,37 @@ import java.util.Map;
*/
public class SegmentCountsPerInterval
{
private final Map<String, Object2IntMap<Interval>>
datasourceIntervalToSegmentCount = new HashMap<>();
private int totalSegments;
private long totalSegmentBytes;
private final Map<String, Object2IntMap<Interval>> datasourceIntervalToSegmentCount = new HashMap<>();
private final Object2IntMap<Interval> intervalToTotalSegmentCount = new Object2IntOpenHashMap<>();
private final Object2IntMap<String> datasourceToTotalSegmentCount = new Object2IntOpenHashMap<>();
public void addSegment(DataSegment segment)
{
updateCountInInterval(segment, 1);
totalSegmentBytes += segment.getSize();
}
public void removeSegment(DataSegment segment)
{
updateCountInInterval(segment, -1);
totalSegmentBytes -= segment.getSize();
}
public int getTotalSegmentCount()
{
return totalSegments;
}
public long getTotalSegmentBytes()
{
return totalSegmentBytes;
}
public Object2IntMap<String> getDatasourceToTotalSegmentCount()
{
return datasourceToTotalSegmentCount;
}
public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
@ -59,7 +78,9 @@ public class SegmentCountsPerInterval
private void updateCountInInterval(DataSegment segment, int delta)
{
totalSegments += delta;
intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, Integer::sum);
datasourceToTotalSegmentCount.mergeInt(segment.getDataSource(), delta, Integer::sum);
datasourceIntervalToSegmentCount
.computeIfAbsent(segment.getDataSource(), ds -> new Object2IntOpenHashMap<>())
.mergeInt(segment.getInterval(), delta, Integer::sum);

View File

@ -181,6 +181,23 @@ public class ServerHolder implements Comparable<ServerHolder>
return server.getMaxSize();
}
/**
* Total projected disk usage of this server in bytes.
* <p>
* The total size:
* <ol>
* <li>INCLUDES segments loaded on this server</li>
* <li>INCLUDES segments loading on this server (actions: LOAD/REPLICATE)</li>
* <li>INCLUDES segments moving to this server (action: MOVE_TO)</li>
* <li>INCLUDES segments moving from this server (action: MOVE_FROM). This is
* because these segments have only been <i>marked</i> for drop. We include
* the size of these segments to avoid over-assigning the server in case the
* corresponding MOVE_TO operation gets delayed or fails.</li>
* <li>EXCLUDES segments dropping from this server (action: DROP). Excluding
* these segments cannot result in over-assignment because drops are always
* processed before loads.</li>
* </ol>
*/
public long getSizeUsed()
{
return server.getCurrSize() + sizeOfLoadingSegments - sizeOfDroppingSegments;
@ -317,6 +334,11 @@ public class ServerHolder implements Comparable<ServerHolder>
return loadingReplicaCount;
}
public int getNumQueuedSegments()
{
return queuedSegments.size();
}
public boolean startOperation(SegmentAction action, DataSegment segment)
{
if (queuedSegments.containsKey(segment)) {
@ -349,7 +371,7 @@ public class ServerHolder implements Comparable<ServerHolder>
}
}
public boolean hasSegmentLoaded(SegmentId segmentId)
private boolean hasSegmentLoaded(SegmentId segmentId)
{
return server.getSegment(segmentId) != null;
}

View File

@ -72,8 +72,7 @@ public interface BalancerStrategy
Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop, List<ServerHolder> serverHolders);
/**
* Returns the stats collected by the strategy in the current run and resets
* the stats collector for the next run.
* Returns the stats collected by the strategy.
*/
CoordinatorRunStats getAndResetStats();
CoordinatorRunStats getStats();
}

View File

@ -46,6 +46,7 @@ import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class CostBalancerStrategy implements BalancerStrategy
@ -68,6 +69,7 @@ public class CostBalancerStrategy implements BalancerStrategy
.thenComparing(pair -> pair.rhs);
private final CoordinatorRunStats stats = new CoordinatorRunStats();
private final AtomicLong computeTimeNanos = new AtomicLong(0);
public static double computeJointSegmentsCost(DataSegment segment, Iterable<DataSegment> segmentSet)
{
@ -263,9 +265,13 @@ public class CostBalancerStrategy implements BalancerStrategy
}
@Override
public CoordinatorRunStats getAndResetStats()
public CoordinatorRunStats getStats()
{
return stats.getSnapshotAndReset();
stats.add(
Stats.Balancer.COMPUTATION_TIME,
TimeUnit.NANOSECONDS.toMillis(computeTimeNanos.getAndSet(0))
);
return stats;
}
/**
@ -351,8 +357,8 @@ public class CostBalancerStrategy implements BalancerStrategy
// Report computation stats
computeTime.stop();
stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1);
stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey, computeTime.elapsed(TimeUnit.MILLISECONDS));
stats.add(Stats.Balancer.COMPUTATION_COUNT, 1);
computeTimeNanos.addAndGet(computeTime.elapsed(TimeUnit.NANOSECONDS));
return costPrioritizedServers.stream().map(pair -> pair.rhs)
.collect(Collectors.toList());

View File

@ -73,7 +73,7 @@ public class RandomBalancerStrategy implements BalancerStrategy
}
@Override
public CoordinatorRunStats getAndResetStats()
public CoordinatorRunStats getStats()
{
return CoordinatorRunStats.empty();
}

View File

@ -0,0 +1,307 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.balancer;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
import org.apache.druid.server.coordinator.ServerHolder;
import org.joda.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
* Calculates the maximum, minimum and required number of segments to move in a
* Coordinator run for balancing.
*/
public class SegmentToMoveCalculator
{
/**
* At least this number of segments must be picked for moving in every cycle
* to keep the cluster well balanced.
*/
private static final int MIN_SEGMENTS_TO_MOVE = 100;
private static final Logger log = new Logger(SegmentToMoveCalculator.class);
/**
* Calculates the number of segments to be picked for moving in the given tier,
* based on the level of skew between the historicals in the tier.
*
* @param tier Name of tier used for logging purposes
* @param historicals Active historicals in tier
* @param maxSegmentsToMoveInTier Maximum number of segments allowed to be moved
* in the tier.
* @return Number of segments to move in the tier in the range
* [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
*/
public static int computeNumSegmentsToMoveInTier(
String tier,
List<ServerHolder> historicals,
int maxSegmentsToMoveInTier
)
{
final int totalSegments = historicals.stream().mapToInt(
server -> server.getProjectedSegments().getTotalSegmentCount()
).sum();
// Move at least some segments to ensure that the cluster is always balancing itself
final int minSegmentsToMove = SegmentToMoveCalculator
.computeMinSegmentsToMoveInTier(totalSegments);
final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
.computeNumSegmentsToMoveToBalanceTier(tier, historicals);
log.info(
"Need to move [%,d] segments in tier[%s] to attain balance. Allowed values are [min=%d, max=%d].",
segmentsToMoveToFixDeviation, tier, minSegmentsToMove, maxSegmentsToMoveInTier
);
final int activeSegmentsToMove = Math.max(minSegmentsToMove, segmentsToMoveToFixDeviation);
return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
}
/**
* Calculates the minimum number of segments that should be considered for
* moving in a tier, so that the cluster is always balancing itself.
* <p>
* This value must be calculated separately for every tier.
*
* @param totalSegmentsInTier Total number of all replicas of all segments
* loaded or queued across all historicals in the tier.
* @return {@code minSegmentsToMoveInTier} in the range
* [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.15% of totalSegmentsInTier}].
*/
public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
{
// Divide by 2^14 and multiply by 100 so that the value increases
// in steps of 100 for every 2^16 = ~65k segments
int upperBound = (totalSegmentsInTier >> 16) * 100;
int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
return Math.max(lowerBound, upperBound);
}
/**
* Calculates the maximum number of segments that can be picked for moving in
* the cluster in a single coordinator run.
* <p>
* This value must be calculated at the cluster level and then applied
* to every tier so that the total computation time is estimated correctly.
* <p>
* Each balancer thread can perform 1 billion computations in 20s (see #14584).
* Therefore, keeping a buffer of 10s, in every 30s:
* <pre>
* numComputations = maxSegmentsToMove * totalSegments
*
* maxSegmentsToMove = numComputations / totalSegments
* = (nThreads * 1B) / totalSegments
* </pre>
*
* @param totalSegments Total number of all replicas of all segments loaded or
* queued across all historicals in the cluster.
* @return {@code maxSegmentsToMove} per tier in the range
* [{@link #MIN_SEGMENTS_TO_MOVE}, ~20% of totalSegments].
* @see <a href="https://github.com/apache/druid/pull/14584">#14584</a>
*/
public static int computeMaxSegmentsToMovePerTier(
int totalSegments,
int numBalancerThreads,
Duration coordinatorPeriod
)
{
Preconditions.checkArgument(
numBalancerThreads > 0 && numBalancerThreads <= 100,
"Number of balancer threads must be in range (0, 100]."
);
if (totalSegments <= 0) {
return 0;
}
// Divide by 2^9 and multiply by 100 so that the upperBound
// increases in steps of 100 for every 2^9 = 512 segments (~20%)
final int upperBound = (totalSegments >> 9) * 100;
final int lowerBound = MIN_SEGMENTS_TO_MOVE;
int num30sPeriods = Math.min(4, (int) (coordinatorPeriod.getMillis() / 30_000));
// Each thread can do ~1B computations in 30s = 1M * 1k = 2^20 * 1k
int maxComputationsInThousands = (numBalancerThreads * num30sPeriods) << 20;
int maxSegmentsToMove = (maxComputationsInThousands / totalSegments) * 1000;
if (upperBound < lowerBound) {
return Math.min(lowerBound, totalSegments);
} else {
return Math.min(maxSegmentsToMove, upperBound);
}
}
/**
* Computes the number of segments that need to be moved across the historicals
* in a tier to attain balance in terms of disk usage and segment counts per
* data source.
*
* @param tier Name of the tier used only for logging purposes
* @param historicals List of historicals in the tier
*/
public static int computeNumSegmentsToMoveToBalanceTier(String tier, List<ServerHolder> historicals)
{
if (historicals.isEmpty()) {
return 0;
}
return Math.max(
computeSegmentsToMoveToBalanceCountsPerDatasource(tier, historicals),
computeSegmentsToMoveToBalanceDiskUsage(tier, historicals)
);
}
private static double getAverageSegmentSize(List<ServerHolder> servers)
{
int totalSegmentCount = 0;
long totalUsageBytes = 0;
for (ServerHolder server : servers) {
totalSegmentCount += server.getProjectedSegments().getTotalSegmentCount();
totalUsageBytes += server.getProjectedSegments().getTotalSegmentBytes();
}
if (totalSegmentCount <= 0 || totalUsageBytes <= 0) {
return 0;
} else {
return (1.0 * totalUsageBytes) / totalSegmentCount;
}
}
/**
* Computes the number of segments to move across the servers of the tier in
* order to balance the segment counts of the most unbalanced datasource.
*/
static int computeSegmentsToMoveToBalanceCountsPerDatasource(
String tier,
List<ServerHolder> servers
)
{
// Find all the datasources
final Set<String> datasources = servers.stream().flatMap(
s -> s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream()
).collect(Collectors.toSet());
if (datasources.isEmpty()) {
return 0;
}
// Compute the min and max number of segments for each datasource
final Object2IntMap<String> datasourceToMaxSegments = new Object2IntOpenHashMap<>();
final Object2IntMap<String> datasourceToMinSegments = new Object2IntOpenHashMap<>();
for (ServerHolder server : servers) {
final Object2IntMap<String> datasourceToSegmentCount
= server.getProjectedSegments().getDatasourceToTotalSegmentCount();
for (String datasource : datasources) {
int count = datasourceToSegmentCount.getInt(datasource);
datasourceToMaxSegments.mergeInt(datasource, count, Math::max);
datasourceToMinSegments.mergeInt(datasource, count, Math::min);
}
}
// Compute the gap between min and max for each datasource and order by largest first
final TreeMap<Integer, String> countDiffToDatasource = new TreeMap<>(Comparator.reverseOrder());
datasourceToMaxSegments.object2IntEntrySet().forEach(entry -> {
String datasource = entry.getKey();
int maxCount = entry.getIntValue();
int minCount = datasourceToMinSegments.getInt(datasource);
countDiffToDatasource.put(maxCount - minCount, datasource);
});
// Identify the most unbalanced datasource
final Map.Entry<Integer, String> maxCountDifference = countDiffToDatasource.firstEntry();
String mostUnbalancedDatasource = maxCountDifference.getValue();
int minNumSegments = Integer.MAX_VALUE;
int maxNumSegments = 0;
for (ServerHolder server : servers) {
int countForSkewedDatasource = server.getProjectedSegments()
.getDatasourceToTotalSegmentCount()
.getInt(mostUnbalancedDatasource);
minNumSegments = Math.min(minNumSegments, countForSkewedDatasource);
maxNumSegments = Math.max(maxNumSegments, countForSkewedDatasource);
}
final int numSegmentsToMove = maxCountDifference.getKey() / 2;
if (numSegmentsToMove > 0) {
log.info(
"Need to move [%,d] segments of datasource[%s] in tier[%s] to fix gap between min[%,d] and max[%,d].",
numSegmentsToMove, mostUnbalancedDatasource, tier, minNumSegments, maxNumSegments
);
}
return numSegmentsToMove;
}
private static int computeSegmentsToMoveToBalanceDiskUsage(
String tier,
List<ServerHolder> servers
)
{
if (servers.isEmpty()) {
return 0;
}
double maxUsagePercent = 0.0;
double minUsagePercent = 100.0;
long maxUsageBytes = 0;
long minUsageBytes = Long.MAX_VALUE;
for (ServerHolder server : servers) {
final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments();
// Track the maximum and minimum values
long serverUsageBytes = projectedSegments.getTotalSegmentBytes();
maxUsageBytes = Math.max(serverUsageBytes, maxUsageBytes);
minUsageBytes = Math.min(serverUsageBytes, minUsageBytes);
double diskUsage = server.getMaxSize() <= 0
? 0 : (100.0 * projectedSegments.getTotalSegmentBytes()) / server.getMaxSize();
maxUsagePercent = Math.max(diskUsage, maxUsagePercent);
minUsagePercent = Math.min(diskUsage, minUsagePercent);
}
final double averageSegmentSize = getAverageSegmentSize(servers);
final long differenceInUsageBytes = maxUsageBytes - minUsageBytes;
final int numSegmentsToMove = averageSegmentSize <= 0
? 0 : (int) (differenceInUsageBytes / averageSegmentSize) / 2;
if (numSegmentsToMove > 0) {
log.info(
"Need to move [%,d] segments of avg size [%,d MB] in tier[%s] to fix"
+ " disk usage gap between min[%d GB][%.1f%%] and max[%d GB][%.1f%%].",
numSegmentsToMove, ((long) averageSegmentSize) >> 20, tier,
minUsageBytes >> 30, minUsagePercent, maxUsageBytes >> 30, maxUsagePercent
);
}
return numSegmentsToMove;
}
private SegmentToMoveCalculator()
{
// no instantiation
}
}

View File

@ -21,9 +21,9 @@ package org.apache.druid.server.coordinator.balancer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
@ -55,27 +55,23 @@ public class TierSegmentBalancer
private final DruidCoordinatorRuntimeParams params;
private final StrategicSegmentAssigner segmentAssigner;
private final SegmentLoadingConfig loadingConfig;
private final CoordinatorRunStats runStats;
private final List<ServerHolder> activeServers;
private final List<ServerHolder> decommissioningServers;
private final int totalMaxSegmentsToMove;
private final int maxSegmentsToMove;
private final int movingSegmentCount;
public TierSegmentBalancer(
String tier,
Set<ServerHolder> servers,
int maxSegmentsToMove,
DruidCoordinatorRuntimeParams params
)
{
this.tier = tier;
this.params = params;
this.segmentAssigner = params.getSegmentAssigner();
this.loadingConfig = params.getSegmentLoadingConfig();
this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
this.runStats = segmentAssigner.getStats();
Map<Boolean, List<ServerHolder>> partitions =
@ -84,55 +80,29 @@ public class TierSegmentBalancer
this.activeServers = partitions.get(false);
this.movingSegmentCount = activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
this.maxSegmentsToMove = maxSegmentsToMove;
}
public void run()
{
if (activeServers.isEmpty() || (activeServers.size() <= 1 && decommissioningServers.isEmpty())) {
log.warn(
"Skipping balance for tier [%s] with [%d] active servers and [%d] decomissioning servers.",
tier, activeServers.size(), decommissioningServers.size()
);
return;
}
int numDecommSegmentsToMove = getNumDecommSegmentsToMove(maxSegmentsToMove);
moveSegmentsFrom(decommissioningServers, numDecommSegmentsToMove, "decommissioning");
log.info(
"Moving max [%d] segments in tier [%s] with [%d] active servers and"
+ " [%d] decommissioning servers. There are [%d] segments already in queue.",
totalMaxSegmentsToMove, tier, activeServers.size(), decommissioningServers.size(), movingSegmentCount
);
// Move segments from decommissioning to active servers
int movedDecommSegments = 0;
if (!decommissioningServers.isEmpty()) {
int maxDecommPercentToMove = loadingConfig.getPercentDecommSegmentsToMove();
int maxDecommSegmentsToMove = (int) Math.ceil(totalMaxSegmentsToMove * (maxDecommPercentToMove / 100.0));
movedDecommSegments +=
moveSegmentsFromTo(decommissioningServers, activeServers, maxDecommSegmentsToMove);
log.info(
"Moved [%d] segments out of max [%d (%d%%)] from decommissioning to active servers in tier [%s].",
movedDecommSegments, maxDecommSegmentsToMove, maxDecommPercentToMove, tier
);
}
// Move segments across active servers
int maxGeneralSegmentsToMove = totalMaxSegmentsToMove - movedDecommSegments;
int movedGeneralSegments =
moveSegmentsFromTo(activeServers, activeServers, maxGeneralSegmentsToMove);
log.info(
"Moved [%d] segments out of max [%d] between active servers in tier [%s].",
movedGeneralSegments, maxGeneralSegmentsToMove, tier
);
int numActiveSegmentsToMove = getNumActiveSegmentsToMove(maxSegmentsToMove - numDecommSegmentsToMove);
moveSegmentsFrom(activeServers, numActiveSegmentsToMove, "active");
}
private int moveSegmentsFromTo(
List<ServerHolder> sourceServers,
List<ServerHolder> destServers,
int maxSegmentsToMove
/**
* Moves segments from the given source servers to the active servers in this tier.
*/
private void moveSegmentsFrom(
final List<ServerHolder> sourceServers,
final int numSegmentsToMove,
final String sourceServerType
)
{
if (maxSegmentsToMove <= 0 || sourceServers.isEmpty() || destServers.isEmpty()) {
return 0;
if (numSegmentsToMove <= 0 || sourceServers.isEmpty() || activeServers.isEmpty()) {
return;
}
final Set<String> broadcastDatasources = params.getBroadcastDatasources();
@ -140,25 +110,30 @@ public class TierSegmentBalancer
// Always move loading segments first as it is a cheaper operation
List<BalancerSegmentHolder> pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom(
sourceServers,
maxSegmentsToMove,
numSegmentsToMove,
ServerHolder::getLoadingSegments,
broadcastDatasources
);
int movedCount = moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
int movedCount = moveSegmentsTo(activeServers, pickedSegments, numSegmentsToMove);
// Move loaded segments only if tier is not already busy moving segments
if (movingSegmentCount <= 0) {
maxSegmentsToMove -= movedCount;
int numLoadedSegmentsToMove = numSegmentsToMove - movedCount;
pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom(
sourceServers,
maxSegmentsToMove,
numLoadedSegmentsToMove,
server -> server.getServer().iterateAllSegments(),
broadcastDatasources
);
movedCount += moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
movedCount += moveSegmentsTo(activeServers, pickedSegments, numLoadedSegmentsToMove);
} else {
log.info("There are already [%,d] segments moving in tier[%s].", movingSegmentCount, tier);
}
return movedCount;
log.info(
"Moved [%,d of %,d] segments from [%d] [%s] servers in tier [%s].",
movedCount, numSegmentsToMove, sourceServers.size(), sourceServerType, tier
);
}
private int moveSegmentsTo(
@ -221,4 +196,39 @@ public class TierSegmentBalancer
runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1);
}
/**
* Number of segments to move away from the decommissioning historicals of this tier.
*/
private int getNumDecommSegmentsToMove(int maxSegmentsToMove)
{
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
if (decommissioningServers.isEmpty() || activeServers.isEmpty()) {
return 0;
} else if (dynamicConfig.isSmartSegmentLoading()) {
final int decommSegmentsToMove = decommissioningServers.stream().mapToInt(
server -> server.getProjectedSegments().getTotalSegmentCount()
).sum();
return Math.min(decommSegmentsToMove, maxSegmentsToMove);
} else {
int maxPercentageToMove = dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove();
return (int) Math.ceil(maxSegmentsToMove * (maxPercentageToMove / 100.0));
}
}
/**
* Number of segments to move between the active historicals of this tier.
*/
private int getNumActiveSegmentsToMove(int maxActiveSegmentsToMove)
{
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
if (activeServers.size() < 2) {
return 0;
} else if (dynamicConfig.isSmartSegmentLoading()) {
return SegmentToMoveCalculator.computeNumSegmentsToMoveInTier(tier, activeServers, maxActiveSegmentsToMove);
} else {
// If smartSegmentLoading is disabled, just use the configured value
return maxActiveSegmentsToMove;
}
}
}

View File

@ -19,12 +19,19 @@
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.SegmentToMoveCalculator;
import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.joda.time.Duration;
import java.util.Set;
/**
*
@ -33,6 +40,13 @@ public class BalanceSegments implements CoordinatorDuty
{
private static final EmittingLogger log = new EmittingLogger(BalanceSegments.class);
private final Duration coordinatorPeriod;
public BalanceSegments(Duration coordinatorPeriod)
{
this.coordinatorPeriod = coordinatorPeriod;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
@ -43,27 +57,79 @@ public class BalanceSegments implements CoordinatorDuty
final DruidCluster cluster = params.getDruidCluster();
final SegmentLoadingConfig loadingConfig = params.getSegmentLoadingConfig();
final int maxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
final int maxSegmentsToMove = getMaxSegmentsToMove(params);
if (maxSegmentsToMove <= 0) {
log.info("Skipping balance as maxSegmentsToMove is [%d].", maxSegmentsToMove);
return params;
} else {
log.info(
"Balancing segments in tiers [%s] with maxSegmentsToMove=[%d], maxLifetime=[%d].",
"Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and maxLifetime[%d].",
cluster.getTierNames(), maxSegmentsToMove, loadingConfig.getMaxLifetimeInLoadQueue()
);
}
cluster.getHistoricals().forEach(
(tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
(tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run()
);
CoordinatorRunStats runStats = params.getCoordinatorStats();
params.getBalancerStrategy()
.getAndResetStats()
.getStats()
.forEachStat(runStats::add);
return params;
}
/**
* Recomputes the value of {@code maxSegmentsToMove} if smart segment loading
* is enabled. {@code maxSegmentsToMove} defines only the upper bound, the actual
* number of segments picked for moving is determined by the {@link TierSegmentBalancer}
* based on the level of skew in the tier.
*/
private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params)
{
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
if (dynamicConfig.isSmartSegmentLoading()) {
final Pair<Integer, Integer> numHistoricalsAndSegments = getNumHistoricalsAndSegments(params.getDruidCluster());
final int totalSegmentsInCluster = numHistoricalsAndSegments.rhs;
final int numBalancerThreads = params.getSegmentLoadingConfig().getBalancerComputeThreads();
final int maxSegmentsToMove = SegmentToMoveCalculator
.computeMaxSegmentsToMovePerTier(totalSegmentsInCluster, numBalancerThreads, coordinatorPeriod);
log.info(
"Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d] historicals.",
maxSegmentsToMove, totalSegmentsInCluster, numHistoricalsAndSegments.lhs
);
return maxSegmentsToMove;
} else {
return dynamicConfig.getMaxSegmentsToMove();
}
}
/**
* Calculates the total number of historicals (active and decommissioning) and
* the total number of segments on these historicals that would participate in
* cost computations. This includes all replicas of all loaded, loading, dropping
* and moving segments.
* <p>
* This is calculated here to ensure that all assignments done by the preceding
* {@link RunRules} duty are accounted for.
*/
private Pair<Integer, Integer> getNumHistoricalsAndSegments(DruidCluster cluster)
{
int numHistoricals = 0;
int numSegments = 0;
for (Set<ServerHolder> historicals : cluster.getHistoricals().values()) {
for (ServerHolder historical : historicals) {
++numHistoricals;
numSegments += historical.getServer().getNumSegments() + historical.getNumQueuedSegments();
}
}
return Pair.of(numHistoricals, numSegments);
}
}

View File

@ -144,8 +144,8 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
final int numHistoricals = historicals.size();
log.info(
"Tier [%s] is serving [%,d], loading [%,d] and dropping [%,d] segments"
+ " across [%d] historicals with average usage [%d GBs], [%.1f%%].",
"Tier[%s] is serving [%,d], loading [%,d] and dropping [%,d] segments"
+ " across [%d] historicals with average usage[%d GBs], [%.1f%%].",
tier, servedCount.get(), loadingCount.get(), droppingCount.get(), numHistoricals,
(currentBytesSum.get() >> 30) / numHistoricals, usageSum.get() / numHistoricals
);

View File

@ -97,7 +97,7 @@ public class UnloadUnusedSegments implements CoordinatorDuty
&& loadQueueManager.dropSegment(segment, serverHolder)) {
totalUnneededCount++;
log.debug(
"Dropping uneeded segment[%s] from server[%s] in tier[%s]",
"Dropping uneeded segment[%s] from server[%s] in tier[%s].",
segment.getId(), server.getName(), server.getTier()
);
}

View File

@ -35,33 +35,27 @@ public class SegmentLoadingConfig
private final int maxReplicaAssignmentsInRun;
private final int maxLifetimeInLoadQueue;
private final int maxSegmentsToMove;
private final int percentDecommSegmentsToMove;
private final int balancerComputeThreads;
private final boolean useRoundRobinSegmentAssignment;
/**
* Creates a new SegmentLoadingConfig with recomputed coordinator config values from
* Creates a new SegmentLoadingConfig with recomputed coordinator config values
* based on whether {@link CoordinatorDynamicConfig#isSmartSegmentLoading()}
* is enabled or not.
*/
public static SegmentLoadingConfig create(CoordinatorDynamicConfig dynamicConfig, int numUsedSegments)
{
if (dynamicConfig.isSmartSegmentLoading()) {
// Compute recommended values
// Impose a lower bound on both replicationThrottleLimit and maxSegmentsToMove
// Compute replicationThrottleLimit with a lower bound of 100
final int throttlePercentage = 2;
final int replicationThrottleLimit = Math.max(100, numUsedSegments * throttlePercentage / 100);
// Impose an upper bound on maxSegmentsToMove to ensure that coordinator
// run times are bounded. This limit can be relaxed as performance of
// the CostBalancerStrategy.computeCost() is improved.
final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);
final int balancerComputeThreads = computeNumBalancerThreads(numUsedSegments);
log.info(
"Smart segment loading is enabled. Recomputed replicationThrottleLimit"
+ " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%,d].",
replicationThrottleLimit, throttlePercentage, numUsedSegments, maxSegmentsToMove
"Smart segment loading is enabled. Calculated balancerComputeThreads[%d]"
+ " and replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
balancerComputeThreads, replicationThrottleLimit, throttlePercentage, numUsedSegments
);
return new SegmentLoadingConfig(
@ -69,9 +63,8 @@ public class SegmentLoadingConfig
replicationThrottleLimit,
Integer.MAX_VALUE,
60,
maxSegmentsToMove,
100,
true
true,
balancerComputeThreads
);
} else {
// Use the configured values
@ -80,9 +73,8 @@ public class SegmentLoadingConfig
dynamicConfig.getReplicationThrottleLimit(),
dynamicConfig.getMaxNonPrimaryReplicantsToLoad(),
dynamicConfig.getReplicantLifetime(),
dynamicConfig.getMaxSegmentsToMove(),
dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(),
dynamicConfig.isUseRoundRobinSegmentAssignment()
dynamicConfig.isUseRoundRobinSegmentAssignment(),
dynamicConfig.getBalancerComputeThreads()
);
}
}
@ -92,18 +84,16 @@ public class SegmentLoadingConfig
int replicationThrottleLimit,
int maxReplicaAssignmentsInRun,
int maxLifetimeInLoadQueue,
int maxSegmentsToMove,
int percentDecommSegmentsToMove,
boolean useRoundRobinSegmentAssignment
boolean useRoundRobinSegmentAssignment,
int balancerComputeThreads
)
{
this.maxSegmentsInLoadQueue = maxSegmentsInLoadQueue;
this.replicationThrottleLimit = replicationThrottleLimit;
this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun;
this.maxLifetimeInLoadQueue = maxLifetimeInLoadQueue;
this.maxSegmentsToMove = maxSegmentsToMove;
this.percentDecommSegmentsToMove = percentDecommSegmentsToMove;
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
this.balancerComputeThreads = balancerComputeThreads;
}
public int getMaxSegmentsInLoadQueue()
@ -111,11 +101,6 @@ public class SegmentLoadingConfig
return maxSegmentsInLoadQueue;
}
public int getMaxSegmentsToMove()
{
return maxSegmentsToMove;
}
public int getReplicationThrottleLimit()
{
return replicationThrottleLimit;
@ -136,8 +121,35 @@ public class SegmentLoadingConfig
return maxReplicaAssignmentsInRun;
}
public int getPercentDecommSegmentsToMove()
public int getBalancerComputeThreads()
{
return percentDecommSegmentsToMove;
return balancerComputeThreads;
}
/**
* Computes the number of threads to be used in the balancing executor.
* The number of used segments in a cluster is generally a good indicator of
* the cluster size and has been used here as a proxy for the actual number of
* segments that would be involved in cost computations.
* <p>
* The number of threads increases by 1 first for every 50k segments, then for
* every 75k segments and so on.
*
* @return Number of {@code balancerComputeThreads} in the range [1, 8].
*/
public static int computeNumBalancerThreads(int numUsedSegments)
{
// Add an extra thread when numUsedSegments increases by a step
final int[] stepValues = {50, 50, 75, 75, 100, 100, 150, 150};
int remainder = numUsedSegments / 1000;
for (int step = 0; step < stepValues.length; ++step) {
remainder -= stepValues[step];
if (remainder < 0) {
return step + 1;
}
}
return stepValues.length;
}
}

View File

@ -255,6 +255,8 @@ public class CoordinatorRunStats
{
if (debugDimensions.isEmpty()) {
return false;
} else if (rowKey.getValues().isEmpty()) {
return true;
}
for (Map.Entry<Dimension, String> entry : rowKey.getValues().entrySet()) {

View File

@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
@ -142,7 +143,7 @@ public class BalanceSegmentsProfiler
.withDatabaseRuleManager(manager)
.build();
BalanceSegments tester = new BalanceSegments();
BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1));
RunRules runner = new RunRules(Set::size);
watch.start();
DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
@ -188,7 +189,7 @@ public class BalanceSegmentsProfiler
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withSegmentAssignerUsing(loadQueueManager)
.build();
BalanceSegments tester = new BalanceSegments();
BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1));
watch.start();
DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
System.out.println(watch.stop());

View File

@ -51,6 +51,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
@ -585,25 +586,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void testBalancerThreadNumber()
{
CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference<>(dynamicConfig)).anyTimes();
ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
EasyMock.replay(scheduledExecutorFactory);
DruidCoordinator c = new DruidCoordinator(
druidCoordinatorConfig,
configManager,
EasyMock.createNiceMock(JacksonConfigManager.class),
null,
null,
null,
@ -617,7 +605,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
null,
null,
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
null,
new RandomBalancerStrategyFactory(),
null,
null,
null
@ -628,20 +616,20 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertNull(c.getBalancerExec());
// first initialization
c.initBalancerExecutor();
c.createBalancerStrategy(5);
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService firstExec = c.getBalancerExec();
Assert.assertNotNull(firstExec);
// second initialization, expect no changes as cachedBalancerThreadNumber is not changed
c.initBalancerExecutor();
c.createBalancerStrategy(5);
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService secondExec = c.getBalancerExec();
Assert.assertNotNull(secondExec);
Assert.assertSame(firstExec, secondExec);
// third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10
c.initBalancerExecutor();
c.createBalancerStrategy(10);
Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
ListeningExecutorService thirdExec = c.getBalancerExec();
Assert.assertNotNull(thirdExec);

View File

@ -193,8 +193,6 @@ public class ServerHolderTest
);
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId()));
Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId()));
Assert.assertFalse(h1.isLoadQueueFull());
}
}

View File

@ -31,8 +31,6 @@ import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.junit.After;
@ -317,7 +315,7 @@ public class CostBalancerStrategyTest
}
@Test
public void testGetAndResetStats()
public void testGetStats()
{
final ServerHolder serverA = new ServerHolder(
createHistorical().toImmutableDruidServer(),
@ -332,20 +330,13 @@ public class CostBalancerStrategyTest
// Verify that computation stats have been tracked
strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, serverB));
CoordinatorRunStats computeStats = strategy.getAndResetStats();
CoordinatorRunStats computeStats = strategy.getStats();
final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
.with(Dimension.DESCRIPTION, "LOAD")
.and(Dimension.TIER, "hot");
Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT, rowKey));
Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT));
long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME, rowKey);
long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME);
Assert.assertTrue(computeTime >= 0 && computeTime <= 100);
Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS));
// Verify that stats have been reset
computeStats = strategy.getAndResetStats();
Assert.assertEquals(0, computeStats.rowCount());
}
@Test

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.balancer;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class SegmentToMoveCalculatorTest
{
private static final Duration DEFAULT_COORDINATOR_PERIOD = Duration.standardMinutes(1);
/**
* 100 days x 100 partitions = 10,000 segments.
*/
private static final List<DataSegment> WIKI_SEGMENTS
= CreateDataSegments.ofDatasource("wiki")
.forIntervals(100, Granularities.DAY)
.withNumPartitions(100)
.eachOfSizeInMb(500);
/**
* 10 days * 1 partitions = 10 segments.
*/
private static final List<DataSegment> KOALA_SEGMENTS
= CreateDataSegments.ofDatasource("koala")
.forIntervals(10, Granularities.DAY)
.eachOfSizeInMb(500);
private static final String TIER = "tier1";
@Test
public void testMaxSegmentsToMove1Thread()
{
Assert.assertEquals(0, computeMaxSegmentsToMove(0, 1));
Assert.assertEquals(50, computeMaxSegmentsToMove(50, 1));
Assert.assertEquals(100, computeMaxSegmentsToMove(100, 1));
Assert.assertEquals(100, computeMaxSegmentsToMove(512, 1));
Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 1));
Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 1));
Assert.assertEquals(1_900, computeMaxSegmentsToMove(10_000, 1));
Assert.assertEquals(9_700, computeMaxSegmentsToMove(50_000, 1));
Assert.assertEquals(19_500, computeMaxSegmentsToMove(100_000, 1));
Assert.assertEquals(10_000, computeMaxSegmentsToMove(200_000, 1));
Assert.assertEquals(4_000, computeMaxSegmentsToMove(500_000, 1));
Assert.assertEquals(2_000, computeMaxSegmentsToMove(1_000_000, 1));
}
@Test
public void testMaxSegmentsToMoveIncreasesWithCoordinatorPeriod()
{
Assert.assertEquals(5_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(30_000)));
Assert.assertEquals(10_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(60_000)));
Assert.assertEquals(15_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(90_000)));
Assert.assertEquals(20_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(120_000)));
Assert.assertEquals(2_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(30_000)));
Assert.assertEquals(4_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(60_000)));
Assert.assertEquals(6_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(90_000)));
Assert.assertEquals(8_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(120_000)));
}
@Test
public void testMaxSegmentsToMove8Threads()
{
Assert.assertEquals(0, computeMaxSegmentsToMove(0, 8));
Assert.assertEquals(50, computeMaxSegmentsToMove(50, 8));
Assert.assertEquals(100, computeMaxSegmentsToMove(100, 8));
Assert.assertEquals(100, computeMaxSegmentsToMove(512, 8));
Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 8));
Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 8));
Assert.assertEquals(33_000, computeMaxSegmentsToMove(500_000, 8));
Assert.assertEquals(16_000, computeMaxSegmentsToMove(1_000_000, 8));
Assert.assertEquals(8_000, computeMaxSegmentsToMove(2_000_000, 8));
Assert.assertEquals(3_000, computeMaxSegmentsToMove(5_000_000, 8));
Assert.assertEquals(1_000, computeMaxSegmentsToMove(10_000_000, 8));
}
@Test
public void testMaxSegmentsToMoveWithComputedNumThreads()
{
Assert.assertEquals(1_900, computeNumThreadsAndMaxToMove(10_000));
Assert.assertEquals(9_700, computeNumThreadsAndMaxToMove(50_000));
Assert.assertEquals(19_500, computeNumThreadsAndMaxToMove(100_000));
Assert.assertEquals(39_000, computeNumThreadsAndMaxToMove(200_000));
Assert.assertEquals(29_000, computeNumThreadsAndMaxToMove(500_000));
Assert.assertEquals(16_000, computeNumThreadsAndMaxToMove(1_000_000));
Assert.assertEquals(8_000, computeNumThreadsAndMaxToMove(2_000_000));
Assert.assertEquals(1_000, computeNumThreadsAndMaxToMove(10_000_000));
}
@Test
public void testMinSegmentsToMove()
{
Assert.assertEquals(0, computeMinSegmentsToMove(0));
Assert.assertEquals(50, computeMinSegmentsToMove(50));
Assert.assertEquals(100, computeMinSegmentsToMove(100));
Assert.assertEquals(100, computeMinSegmentsToMove(1_000));
Assert.assertEquals(100, computeMinSegmentsToMove(20_000));
Assert.assertEquals(100, computeMinSegmentsToMove(50_000));
Assert.assertEquals(100, computeMinSegmentsToMove(100_000));
Assert.assertEquals(300, computeMinSegmentsToMove(200_000));
Assert.assertEquals(700, computeMinSegmentsToMove(500_000));
Assert.assertEquals(1_500, computeMinSegmentsToMove(1_000_000));
Assert.assertEquals(15_200, computeMinSegmentsToMove(10_000_000));
}
@Test
public void testMinSegmentsToMoveIncreasesInSteps()
{
Assert.assertEquals(100, computeMinSegmentsToMove(131_071));
Assert.assertEquals(200, computeMinSegmentsToMove(131_072));
Assert.assertEquals(500, computeMinSegmentsToMove(393_215));
Assert.assertEquals(600, computeMinSegmentsToMove(393_216));
Assert.assertEquals(900, computeMinSegmentsToMove(655_359));
Assert.assertEquals(1000, computeMinSegmentsToMove(655_360));
Assert.assertEquals(9_900, computeMinSegmentsToMove(6_553_599));
Assert.assertEquals(10_000, computeMinSegmentsToMove(6_553_600));
}
@Test
public void testMinSegmentsArePickedForMoveWhenNoSkew()
{
final List<ServerHolder> historicals = Arrays.asList(
createServer("A", WIKI_SEGMENTS),
createServer("B", WIKI_SEGMENTS)
);
final int minSegmentsToMove = SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(20_000);
Assert.assertEquals(100, minSegmentsToMove);
final int segmentsToMoveToFixSkew = SegmentToMoveCalculator
.computeNumSegmentsToMoveToBalanceTier(TIER, historicals);
Assert.assertEquals(0, segmentsToMoveToFixSkew);
// Find segmentsToMove with no limit on maxSegmentsToMove
final int segmentsToMove = SegmentToMoveCalculator
.computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE);
Assert.assertEquals(minSegmentsToMove, segmentsToMove);
}
@Test
public void testHalfSegmentsArePickedForMoveWhenFullSkew()
{
final List<ServerHolder> historicals = Arrays.asList(
createServer("A", WIKI_SEGMENTS),
createServer("B", Collections.emptyList())
);
final int minSegmentsToMove = SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(10_000);
Assert.assertEquals(100, minSegmentsToMove);
final int segmentsToMoveToFixSkew = SegmentToMoveCalculator
.computeNumSegmentsToMoveToBalanceTier(TIER, historicals);
Assert.assertEquals(5_000, segmentsToMoveToFixSkew);
// Find segmentsToMove with no limit on maxSegmentsToMove
final int segmentsToMove = SegmentToMoveCalculator
.computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE);
Assert.assertEquals(segmentsToMoveToFixSkew, segmentsToMove);
}
@Test
public void testDatasourceWithLargestGapDeterminesNumToBalanceCounts()
{
// Both servers have all koala segments but only A has wiki segments
List<DataSegment> segmentsForServerA = new ArrayList<>(WIKI_SEGMENTS);
segmentsForServerA.addAll(KOALA_SEGMENTS);
final List<ServerHolder> historicals = Arrays.asList(
createServer("A", segmentsForServerA),
createServer("B", KOALA_SEGMENTS)
);
// Verify that half the wiki segments need to be moved for balance
int numToMoveToBalanceCount = SegmentToMoveCalculator
.computeSegmentsToMoveToBalanceCountsPerDatasource(TIER, historicals);
Assert.assertEquals(WIKI_SEGMENTS.size() / 2, numToMoveToBalanceCount);
}
private static int computeMaxSegmentsToMove(int totalSegments, int numThreads)
{
return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
totalSegments,
numThreads,
DEFAULT_COORDINATOR_PERIOD
);
}
private static int computeMaxSegmentsToMoveInPeriod(int totalSegments, Duration coordinatorPeriod)
{
return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegments, 1, coordinatorPeriod);
}
private static int computeNumThreadsAndMaxToMove(int totalSegments)
{
return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
totalSegments,
SegmentLoadingConfig.computeNumBalancerThreads(totalSegments),
DEFAULT_COORDINATOR_PERIOD
);
}
private static int computeMinSegmentsToMove(int totalSegmentsInTier)
{
return SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(totalSegmentsInTier);
}
private static ServerHolder createServer(String name, List<DataSegment> segments)
{
final DruidServer server
= new DruidServer(name, name, null, 10L << 30, ServerType.HISTORICAL, "tier1", 1);
segments.forEach(server::addDataSegment);
return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon());
}
}

View File

@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@ -378,7 +379,7 @@ public class BalanceSegmentsTest
private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params)
{
params = new BalanceSegments().run(params);
params = new BalanceSegments(Duration.standardMinutes(1)).run(params);
if (params == null) {
Assert.fail("BalanceSegments duty returned null params");
return new CoordinatorRunStats();

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.loading;
import org.junit.Assert;
import org.junit.Test;
public class SegmentLoadingConfigTest
{
@Test
public void testComputeNumBalancerThreads()
{
Assert.assertEquals(1, computeBalancerThreads(0));
Assert.assertEquals(1, computeBalancerThreads(30_000));
Assert.assertEquals(2, computeBalancerThreads(50_000));
Assert.assertEquals(3, computeBalancerThreads(100_000));
Assert.assertEquals(4, computeBalancerThreads(175_000));
Assert.assertEquals(5, computeBalancerThreads(250_000));
Assert.assertEquals(6, computeBalancerThreads(350_000));
Assert.assertEquals(7, computeBalancerThreads(450_000));
Assert.assertEquals(8, computeBalancerThreads(600_000));
Assert.assertEquals(8, computeBalancerThreads(1_000_000));
Assert.assertEquals(8, computeBalancerThreads(10_000_000));
}
private int computeBalancerThreads(int numUsedSegments)
{
return SegmentLoadingConfig.computeNumBalancerThreads(numUsedSegments);
}
}

View File

@ -217,8 +217,8 @@ public abstract class CoordinatorSimulationBaseTest implements
{
static final String ASSIGNED_COUNT = "segment/assigned/count";
static final String MOVED_COUNT = "segment/moved/count";
static final String MOVE_SKIPPED = "segment/moveSkipped/count";
static final String DROPPED_COUNT = "segment/dropped/count";
static final String OVERSHADOWED_COUNT = "segment/overshadowed/count";
static final String DELETED_COUNT = "segment/deleted/count";
static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
static final String DROP_QUEUE_COUNT = "segment/dropQueue/count";

View File

@ -74,7 +74,6 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class CoordinatorSimulationBuilder
{
private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
.setInjectableValues(
new InjectableValues.Std().addValue(
@ -463,8 +462,8 @@ public class CoordinatorSimulationBuilder
this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
.withCoordinatorStartDelay(new Duration(1L))
.withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
.withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
.withCoordinatorPeriod(Duration.standardMinutes(1))
.withCoordinatorKillPeriod(Duration.millis(100))
.withLoadQueuePeonType("http")
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();

View File

@ -26,6 +26,8 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Coordinator simulation test to verify behaviour of segment balancing.
@ -229,4 +231,39 @@ public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
Assert.assertTrue(getValue(Metric.MOVED_COUNT, null).intValue() > 0);
}
@Test(timeout = 60000L)
public void testMaxSegmentsAreMovedWhenClusterIsSkewed()
{
// Add 10 historicals of size 1 TB each
final long size1TB = 1_000_000;
List<DruidServer> historicals
= IntStream.range(0, 10)
.mapToObj(i -> createHistorical(i + 1, Tier.T1, size1TB))
.collect(Collectors.toList());
CoordinatorSimulation sim =
CoordinatorSimulation.builder()
.withSegments(Segments.KOALA_100X100D)
.withServers(historicals)
.withRules(DS.KOALA, Load.on(Tier.T1, 1).forever())
.build();
startSimulation(sim);
// Run 1: All segments are assigned to the 10 historicals
runCoordinatorCycle();
verifyValue(Metric.ASSIGNED_COUNT, 10_000L);
verifyNotEmitted(Metric.MOVED_COUNT);
verifyValue(Metric.MOVE_SKIPPED, 100L);
// Run 2: Add 10 more historicals, some segments are moved to them
for (int i = 11; i <= 20; ++i) {
addServer(createHistorical(i, Tier.T1, size1TB));
}
runCoordinatorCycle();
verifyValue(Metric.MOVED_COUNT, 500L);
verifyNotEmitted(Metric.MOVE_SKIPPED);
}
}