diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 09d8e2b6874..fd017dd5edf 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -542,26 +542,37 @@ public class DruidCoordinator } } + /** + * Resets the balancerExec if required and creates a new BalancerStrategy for + * the current coordinator run. + */ @VisibleForTesting - protected void initBalancerExecutor() + BalancerStrategy createBalancerStrategy(int balancerComputeThreads) { - final int currentNumber = getDynamicConfigs().getBalancerComputeThreads(); - + // Reset balancerExecutor if required if (balancerExec == null) { - balancerExec = createNewBalancerExecutor(currentNumber); - } else if (cachedBalancerThreadNumber != currentNumber) { + balancerExec = createNewBalancerExecutor(balancerComputeThreads); + } else if (cachedBalancerThreadNumber != balancerComputeThreads) { log.info( - "balancerComputeThreads has changed from [%d] to [%d], recreating the thread pool.", - cachedBalancerThreadNumber, - currentNumber + "'balancerComputeThreads' has changed from [%d] to [%d]", + cachedBalancerThreadNumber, balancerComputeThreads ); balancerExec.shutdownNow(); - balancerExec = createNewBalancerExecutor(currentNumber); + balancerExec = createNewBalancerExecutor(balancerComputeThreads); } + + // Create BalancerStrategy + final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec); + log.info( + "Using balancer strategy[%s] with [%d] threads.", + balancerStrategy.getClass().getSimpleName(), balancerComputeThreads + ); + return balancerStrategy; } private ListeningExecutorService createNewBalancerExecutor(int numThreads) { + log.info("Creating new balancer executor with [%d] threads.", numThreads); cachedBalancerThreadNumber = numThreads; return MoreExecutors.listeningDecorator( Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s") @@ -576,7 +587,7 @@ public class DruidCoordinator new UpdateReplicationStatus(), new UnloadUnusedSegments(loadQueueManager), new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused), - new BalanceSegments(), + new BalanceSegments(config.getCoordinatorPeriod()), new CollectSegmentAndServerStats(DruidCoordinator.this) ); } @@ -750,7 +761,7 @@ public class DruidCoordinator ); log.info( - "Emitted [%d] stats for group [%s]. All collected stats:%s\n", + "Emitted [%d] stats for group [%s]. All collected stats:%s", emittedCount.get(), dutyGroupName, allStats.buildStatsTable() ); } @@ -758,7 +769,7 @@ public class DruidCoordinator // Emit the runtime of the full DutiesRunnable final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS); emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis); - log.info("Finished coordinator run for group [%s] in [%d] ms", dutyGroupName, runMillis); + log.info("Finished coordinator run for group [%s] in [%d] ms.%n", dutyGroupName, runMillis); } catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); @@ -818,15 +829,8 @@ public class DruidCoordinator final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); cancelLoadsOnDecommissioningServers(cluster); - initBalancerExecutor(); - final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec); - log.info( - "Using balancer strategy [%s] with round-robin assignment [%s] and debug dimensions [%s].", - balancerStrategy.getClass().getSimpleName(), - segmentLoadingConfig.isUseRoundRobinSegmentAssignment(), - dynamicConfig.getDebugDimensions() - ); - + final BalancerStrategy balancerStrategy + = createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads()); return params.buildFromExisting() .withDruidCluster(cluster) .withBalancerStrategy(balancerStrategy) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java index ea1f81ee6d4..a9767b8f768 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java @@ -33,18 +33,37 @@ import java.util.Map; */ public class SegmentCountsPerInterval { - private final Map> - datasourceIntervalToSegmentCount = new HashMap<>(); + private int totalSegments; + private long totalSegmentBytes; + private final Map> datasourceIntervalToSegmentCount = new HashMap<>(); private final Object2IntMap intervalToTotalSegmentCount = new Object2IntOpenHashMap<>(); + private final Object2IntMap datasourceToTotalSegmentCount = new Object2IntOpenHashMap<>(); public void addSegment(DataSegment segment) { updateCountInInterval(segment, 1); + totalSegmentBytes += segment.getSize(); } public void removeSegment(DataSegment segment) { updateCountInInterval(segment, -1); + totalSegmentBytes -= segment.getSize(); + } + + public int getTotalSegmentCount() + { + return totalSegments; + } + + public long getTotalSegmentBytes() + { + return totalSegmentBytes; + } + + public Object2IntMap getDatasourceToTotalSegmentCount() + { + return datasourceToTotalSegmentCount; } public Object2IntMap getIntervalToSegmentCount(String datasource) @@ -59,7 +78,9 @@ public class SegmentCountsPerInterval private void updateCountInInterval(DataSegment segment, int delta) { + totalSegments += delta; intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, Integer::sum); + datasourceToTotalSegmentCount.mergeInt(segment.getDataSource(), delta, Integer::sum); datasourceIntervalToSegmentCount .computeIfAbsent(segment.getDataSource(), ds -> new Object2IntOpenHashMap<>()) .mergeInt(segment.getInterval(), delta, Integer::sum); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index d55ac035d61..5de1bd5ee06 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -181,6 +181,23 @@ public class ServerHolder implements Comparable return server.getMaxSize(); } + /** + * Total projected disk usage of this server in bytes. + *

+ * The total size: + *

    + *
  1. INCLUDES segments loaded on this server
  2. + *
  3. INCLUDES segments loading on this server (actions: LOAD/REPLICATE)
  4. + *
  5. INCLUDES segments moving to this server (action: MOVE_TO)
  6. + *
  7. INCLUDES segments moving from this server (action: MOVE_FROM). This is + * because these segments have only been marked for drop. We include + * the size of these segments to avoid over-assigning the server in case the + * corresponding MOVE_TO operation gets delayed or fails.
  8. + *
  9. EXCLUDES segments dropping from this server (action: DROP). Excluding + * these segments cannot result in over-assignment because drops are always + * processed before loads.
  10. + *
+ */ public long getSizeUsed() { return server.getCurrSize() + sizeOfLoadingSegments - sizeOfDroppingSegments; @@ -317,6 +334,11 @@ public class ServerHolder implements Comparable return loadingReplicaCount; } + public int getNumQueuedSegments() + { + return queuedSegments.size(); + } + public boolean startOperation(SegmentAction action, DataSegment segment) { if (queuedSegments.containsKey(segment)) { @@ -349,7 +371,7 @@ public class ServerHolder implements Comparable } } - public boolean hasSegmentLoaded(SegmentId segmentId) + private boolean hasSegmentLoaded(SegmentId segmentId) { return server.getSegment(segmentId) != null; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java index e133430b1b2..9def0328348 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java @@ -72,8 +72,7 @@ public interface BalancerStrategy Iterator findServersToDropSegment(DataSegment segmentToDrop, List serverHolders); /** - * Returns the stats collected by the strategy in the current run and resets - * the stats collector for the next run. + * Returns the stats collected by the strategy. */ - CoordinatorRunStats getAndResetStats(); + CoordinatorRunStats getStats(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java index aae907bb599..96a6ccccf5c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class CostBalancerStrategy implements BalancerStrategy @@ -68,6 +69,7 @@ public class CostBalancerStrategy implements BalancerStrategy .thenComparing(pair -> pair.rhs); private final CoordinatorRunStats stats = new CoordinatorRunStats(); + private final AtomicLong computeTimeNanos = new AtomicLong(0); public static double computeJointSegmentsCost(DataSegment segment, Iterable segmentSet) { @@ -263,9 +265,13 @@ public class CostBalancerStrategy implements BalancerStrategy } @Override - public CoordinatorRunStats getAndResetStats() + public CoordinatorRunStats getStats() { - return stats.getSnapshotAndReset(); + stats.add( + Stats.Balancer.COMPUTATION_TIME, + TimeUnit.NANOSECONDS.toMillis(computeTimeNanos.getAndSet(0)) + ); + return stats; } /** @@ -351,8 +357,8 @@ public class CostBalancerStrategy implements BalancerStrategy // Report computation stats computeTime.stop(); - stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1); - stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey, computeTime.elapsed(TimeUnit.MILLISECONDS)); + stats.add(Stats.Balancer.COMPUTATION_COUNT, 1); + computeTimeNanos.addAndGet(computeTime.elapsed(TimeUnit.NANOSECONDS)); return costPrioritizedServers.stream().map(pair -> pair.rhs) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java index fc0da343102..b8e00005674 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java @@ -73,7 +73,7 @@ public class RandomBalancerStrategy implements BalancerStrategy } @Override - public CoordinatorRunStats getAndResetStats() + public CoordinatorRunStats getStats() { return CoordinatorRunStats.empty(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java new file mode 100644 index 00000000000..d8846740a73 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.balancer; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.SegmentCountsPerInterval; +import org.apache.druid.server.coordinator.ServerHolder; +import org.joda.time.Duration; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Calculates the maximum, minimum and required number of segments to move in a + * Coordinator run for balancing. + */ +public class SegmentToMoveCalculator +{ + /** + * At least this number of segments must be picked for moving in every cycle + * to keep the cluster well balanced. + */ + private static final int MIN_SEGMENTS_TO_MOVE = 100; + + private static final Logger log = new Logger(SegmentToMoveCalculator.class); + + /** + * Calculates the number of segments to be picked for moving in the given tier, + * based on the level of skew between the historicals in the tier. + * + * @param tier Name of tier used for logging purposes + * @param historicals Active historicals in tier + * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be moved + * in the tier. + * @return Number of segments to move in the tier in the range + * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}]. + */ + public static int computeNumSegmentsToMoveInTier( + String tier, + List historicals, + int maxSegmentsToMoveInTier + ) + { + final int totalSegments = historicals.stream().mapToInt( + server -> server.getProjectedSegments().getTotalSegmentCount() + ).sum(); + + // Move at least some segments to ensure that the cluster is always balancing itself + final int minSegmentsToMove = SegmentToMoveCalculator + .computeMinSegmentsToMoveInTier(totalSegments); + final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator + .computeNumSegmentsToMoveToBalanceTier(tier, historicals); + log.info( + "Need to move [%,d] segments in tier[%s] to attain balance. Allowed values are [min=%d, max=%d].", + segmentsToMoveToFixDeviation, tier, minSegmentsToMove, maxSegmentsToMoveInTier + ); + + final int activeSegmentsToMove = Math.max(minSegmentsToMove, segmentsToMoveToFixDeviation); + return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier); + } + + /** + * Calculates the minimum number of segments that should be considered for + * moving in a tier, so that the cluster is always balancing itself. + *

+ * This value must be calculated separately for every tier. + * + * @param totalSegmentsInTier Total number of all replicas of all segments + * loaded or queued across all historicals in the tier. + * @return {@code minSegmentsToMoveInTier} in the range + * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.15% of totalSegmentsInTier}]. + */ + public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier) + { + // Divide by 2^14 and multiply by 100 so that the value increases + // in steps of 100 for every 2^16 = ~65k segments + int upperBound = (totalSegmentsInTier >> 16) * 100; + int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier); + return Math.max(lowerBound, upperBound); + } + + /** + * Calculates the maximum number of segments that can be picked for moving in + * the cluster in a single coordinator run. + *

+ * This value must be calculated at the cluster level and then applied + * to every tier so that the total computation time is estimated correctly. + *

+ * Each balancer thread can perform 1 billion computations in 20s (see #14584). + * Therefore, keeping a buffer of 10s, in every 30s: + *

+   * numComputations = maxSegmentsToMove * totalSegments
+   *
+   * maxSegmentsToMove = numComputations / totalSegments
+   *                   = (nThreads * 1B) / totalSegments
+   * 
+ * + * @param totalSegments Total number of all replicas of all segments loaded or + * queued across all historicals in the cluster. + * @return {@code maxSegmentsToMove} per tier in the range + * [{@link #MIN_SEGMENTS_TO_MOVE}, ~20% of totalSegments]. + * @see #14584 + */ + public static int computeMaxSegmentsToMovePerTier( + int totalSegments, + int numBalancerThreads, + Duration coordinatorPeriod + ) + { + Preconditions.checkArgument( + numBalancerThreads > 0 && numBalancerThreads <= 100, + "Number of balancer threads must be in range (0, 100]." + ); + if (totalSegments <= 0) { + return 0; + } + + // Divide by 2^9 and multiply by 100 so that the upperBound + // increases in steps of 100 for every 2^9 = 512 segments (~20%) + final int upperBound = (totalSegments >> 9) * 100; + final int lowerBound = MIN_SEGMENTS_TO_MOVE; + + int num30sPeriods = Math.min(4, (int) (coordinatorPeriod.getMillis() / 30_000)); + + // Each thread can do ~1B computations in 30s = 1M * 1k = 2^20 * 1k + int maxComputationsInThousands = (numBalancerThreads * num30sPeriods) << 20; + int maxSegmentsToMove = (maxComputationsInThousands / totalSegments) * 1000; + + if (upperBound < lowerBound) { + return Math.min(lowerBound, totalSegments); + } else { + return Math.min(maxSegmentsToMove, upperBound); + } + } + + /** + * Computes the number of segments that need to be moved across the historicals + * in a tier to attain balance in terms of disk usage and segment counts per + * data source. + * + * @param tier Name of the tier used only for logging purposes + * @param historicals List of historicals in the tier + */ + public static int computeNumSegmentsToMoveToBalanceTier(String tier, List historicals) + { + if (historicals.isEmpty()) { + return 0; + } + + return Math.max( + computeSegmentsToMoveToBalanceCountsPerDatasource(tier, historicals), + computeSegmentsToMoveToBalanceDiskUsage(tier, historicals) + ); + } + + private static double getAverageSegmentSize(List servers) + { + int totalSegmentCount = 0; + long totalUsageBytes = 0; + for (ServerHolder server : servers) { + totalSegmentCount += server.getProjectedSegments().getTotalSegmentCount(); + totalUsageBytes += server.getProjectedSegments().getTotalSegmentBytes(); + } + + if (totalSegmentCount <= 0 || totalUsageBytes <= 0) { + return 0; + } else { + return (1.0 * totalUsageBytes) / totalSegmentCount; + } + } + + /** + * Computes the number of segments to move across the servers of the tier in + * order to balance the segment counts of the most unbalanced datasource. + */ + static int computeSegmentsToMoveToBalanceCountsPerDatasource( + String tier, + List servers + ) + { + // Find all the datasources + final Set datasources = servers.stream().flatMap( + s -> s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream() + ).collect(Collectors.toSet()); + if (datasources.isEmpty()) { + return 0; + } + + // Compute the min and max number of segments for each datasource + final Object2IntMap datasourceToMaxSegments = new Object2IntOpenHashMap<>(); + final Object2IntMap datasourceToMinSegments = new Object2IntOpenHashMap<>(); + for (ServerHolder server : servers) { + final Object2IntMap datasourceToSegmentCount + = server.getProjectedSegments().getDatasourceToTotalSegmentCount(); + for (String datasource : datasources) { + int count = datasourceToSegmentCount.getInt(datasource); + datasourceToMaxSegments.mergeInt(datasource, count, Math::max); + datasourceToMinSegments.mergeInt(datasource, count, Math::min); + } + } + + // Compute the gap between min and max for each datasource and order by largest first + final TreeMap countDiffToDatasource = new TreeMap<>(Comparator.reverseOrder()); + datasourceToMaxSegments.object2IntEntrySet().forEach(entry -> { + String datasource = entry.getKey(); + int maxCount = entry.getIntValue(); + int minCount = datasourceToMinSegments.getInt(datasource); + countDiffToDatasource.put(maxCount - minCount, datasource); + }); + + // Identify the most unbalanced datasource + final Map.Entry maxCountDifference = countDiffToDatasource.firstEntry(); + String mostUnbalancedDatasource = maxCountDifference.getValue(); + int minNumSegments = Integer.MAX_VALUE; + int maxNumSegments = 0; + for (ServerHolder server : servers) { + int countForSkewedDatasource = server.getProjectedSegments() + .getDatasourceToTotalSegmentCount() + .getInt(mostUnbalancedDatasource); + + minNumSegments = Math.min(minNumSegments, countForSkewedDatasource); + maxNumSegments = Math.max(maxNumSegments, countForSkewedDatasource); + } + + final int numSegmentsToMove = maxCountDifference.getKey() / 2; + if (numSegmentsToMove > 0) { + log.info( + "Need to move [%,d] segments of datasource[%s] in tier[%s] to fix gap between min[%,d] and max[%,d].", + numSegmentsToMove, mostUnbalancedDatasource, tier, minNumSegments, maxNumSegments + ); + } + return numSegmentsToMove; + } + + private static int computeSegmentsToMoveToBalanceDiskUsage( + String tier, + List servers + ) + { + if (servers.isEmpty()) { + return 0; + } + + double maxUsagePercent = 0.0; + double minUsagePercent = 100.0; + + long maxUsageBytes = 0; + long minUsageBytes = Long.MAX_VALUE; + for (ServerHolder server : servers) { + final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments(); + + // Track the maximum and minimum values + long serverUsageBytes = projectedSegments.getTotalSegmentBytes(); + maxUsageBytes = Math.max(serverUsageBytes, maxUsageBytes); + minUsageBytes = Math.min(serverUsageBytes, minUsageBytes); + + double diskUsage = server.getMaxSize() <= 0 + ? 0 : (100.0 * projectedSegments.getTotalSegmentBytes()) / server.getMaxSize(); + maxUsagePercent = Math.max(diskUsage, maxUsagePercent); + minUsagePercent = Math.min(diskUsage, minUsagePercent); + } + + final double averageSegmentSize = getAverageSegmentSize(servers); + final long differenceInUsageBytes = maxUsageBytes - minUsageBytes; + final int numSegmentsToMove = averageSegmentSize <= 0 + ? 0 : (int) (differenceInUsageBytes / averageSegmentSize) / 2; + + if (numSegmentsToMove > 0) { + log.info( + "Need to move [%,d] segments of avg size [%,d MB] in tier[%s] to fix" + + " disk usage gap between min[%d GB][%.1f%%] and max[%d GB][%.1f%%].", + numSegmentsToMove, ((long) averageSegmentSize) >> 20, tier, + minUsageBytes >> 30, minUsagePercent, maxUsageBytes >> 30, maxUsagePercent + ); + } + return numSegmentsToMove; + } + + private SegmentToMoveCalculator() + { + // no instantiation + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java index 86e5b996293..1523576b11c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java @@ -21,9 +21,9 @@ package org.apache.druid.server.coordinator.balancer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig; import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Dimension; @@ -55,27 +55,23 @@ public class TierSegmentBalancer private final DruidCoordinatorRuntimeParams params; private final StrategicSegmentAssigner segmentAssigner; - private final SegmentLoadingConfig loadingConfig; private final CoordinatorRunStats runStats; private final List activeServers; private final List decommissioningServers; - private final int totalMaxSegmentsToMove; - + private final int maxSegmentsToMove; private final int movingSegmentCount; public TierSegmentBalancer( String tier, Set servers, + int maxSegmentsToMove, DruidCoordinatorRuntimeParams params ) { this.tier = tier; this.params = params; this.segmentAssigner = params.getSegmentAssigner(); - - this.loadingConfig = params.getSegmentLoadingConfig(); - this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove(); this.runStats = segmentAssigner.getStats(); Map> partitions = @@ -84,55 +80,29 @@ public class TierSegmentBalancer this.activeServers = partitions.get(false); this.movingSegmentCount = activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum(); + this.maxSegmentsToMove = maxSegmentsToMove; } public void run() { - if (activeServers.isEmpty() || (activeServers.size() <= 1 && decommissioningServers.isEmpty())) { - log.warn( - "Skipping balance for tier [%s] with [%d] active servers and [%d] decomissioning servers.", - tier, activeServers.size(), decommissioningServers.size() - ); - return; - } + int numDecommSegmentsToMove = getNumDecommSegmentsToMove(maxSegmentsToMove); + moveSegmentsFrom(decommissioningServers, numDecommSegmentsToMove, "decommissioning"); - log.info( - "Moving max [%d] segments in tier [%s] with [%d] active servers and" - + " [%d] decommissioning servers. There are [%d] segments already in queue.", - totalMaxSegmentsToMove, tier, activeServers.size(), decommissioningServers.size(), movingSegmentCount - ); - - // Move segments from decommissioning to active servers - int movedDecommSegments = 0; - if (!decommissioningServers.isEmpty()) { - int maxDecommPercentToMove = loadingConfig.getPercentDecommSegmentsToMove(); - int maxDecommSegmentsToMove = (int) Math.ceil(totalMaxSegmentsToMove * (maxDecommPercentToMove / 100.0)); - movedDecommSegments += - moveSegmentsFromTo(decommissioningServers, activeServers, maxDecommSegmentsToMove); - log.info( - "Moved [%d] segments out of max [%d (%d%%)] from decommissioning to active servers in tier [%s].", - movedDecommSegments, maxDecommSegmentsToMove, maxDecommPercentToMove, tier - ); - } - - // Move segments across active servers - int maxGeneralSegmentsToMove = totalMaxSegmentsToMove - movedDecommSegments; - int movedGeneralSegments = - moveSegmentsFromTo(activeServers, activeServers, maxGeneralSegmentsToMove); - log.info( - "Moved [%d] segments out of max [%d] between active servers in tier [%s].", - movedGeneralSegments, maxGeneralSegmentsToMove, tier - ); + int numActiveSegmentsToMove = getNumActiveSegmentsToMove(maxSegmentsToMove - numDecommSegmentsToMove); + moveSegmentsFrom(activeServers, numActiveSegmentsToMove, "active"); } - private int moveSegmentsFromTo( - List sourceServers, - List destServers, - int maxSegmentsToMove + /** + * Moves segments from the given source servers to the active servers in this tier. + */ + private void moveSegmentsFrom( + final List sourceServers, + final int numSegmentsToMove, + final String sourceServerType ) { - if (maxSegmentsToMove <= 0 || sourceServers.isEmpty() || destServers.isEmpty()) { - return 0; + if (numSegmentsToMove <= 0 || sourceServers.isEmpty() || activeServers.isEmpty()) { + return; } final Set broadcastDatasources = params.getBroadcastDatasources(); @@ -140,25 +110,30 @@ public class TierSegmentBalancer // Always move loading segments first as it is a cheaper operation List pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom( sourceServers, - maxSegmentsToMove, + numSegmentsToMove, ServerHolder::getLoadingSegments, broadcastDatasources ); - int movedCount = moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove); + int movedCount = moveSegmentsTo(activeServers, pickedSegments, numSegmentsToMove); // Move loaded segments only if tier is not already busy moving segments if (movingSegmentCount <= 0) { - maxSegmentsToMove -= movedCount; + int numLoadedSegmentsToMove = numSegmentsToMove - movedCount; pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom( sourceServers, - maxSegmentsToMove, + numLoadedSegmentsToMove, server -> server.getServer().iterateAllSegments(), broadcastDatasources ); - movedCount += moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove); + movedCount += moveSegmentsTo(activeServers, pickedSegments, numLoadedSegmentsToMove); + } else { + log.info("There are already [%,d] segments moving in tier[%s].", movingSegmentCount, tier); } - return movedCount; + log.info( + "Moved [%,d of %,d] segments from [%d] [%s] servers in tier [%s].", + movedCount, numSegmentsToMove, sourceServers.size(), sourceServerType, tier + ); } private int moveSegmentsTo( @@ -221,4 +196,39 @@ public class TierSegmentBalancer runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1); } + /** + * Number of segments to move away from the decommissioning historicals of this tier. + */ + private int getNumDecommSegmentsToMove(int maxSegmentsToMove) + { + final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); + if (decommissioningServers.isEmpty() || activeServers.isEmpty()) { + return 0; + } else if (dynamicConfig.isSmartSegmentLoading()) { + final int decommSegmentsToMove = decommissioningServers.stream().mapToInt( + server -> server.getProjectedSegments().getTotalSegmentCount() + ).sum(); + return Math.min(decommSegmentsToMove, maxSegmentsToMove); + } else { + int maxPercentageToMove = dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(); + return (int) Math.ceil(maxSegmentsToMove * (maxPercentageToMove / 100.0)); + } + } + + /** + * Number of segments to move between the active historicals of this tier. + */ + private int getNumActiveSegmentsToMove(int maxActiveSegmentsToMove) + { + final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); + if (activeServers.size() < 2) { + return 0; + } else if (dynamicConfig.isSmartSegmentLoading()) { + return SegmentToMoveCalculator.computeNumSegmentsToMoveInTier(tier, activeServers, maxActiveSegmentsToMove); + } else { + // If smartSegmentLoading is disabled, just use the configured value + return maxActiveSegmentsToMove; + } + } + } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index 4ff2e657438..37e58ff28f8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -19,12 +19,19 @@ package org.apache.druid.server.coordinator.duty; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.balancer.SegmentToMoveCalculator; import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer; import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.joda.time.Duration; + +import java.util.Set; /** * @@ -33,6 +40,13 @@ public class BalanceSegments implements CoordinatorDuty { private static final EmittingLogger log = new EmittingLogger(BalanceSegments.class); + private final Duration coordinatorPeriod; + + public BalanceSegments(Duration coordinatorPeriod) + { + this.coordinatorPeriod = coordinatorPeriod; + } + @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { @@ -43,27 +57,79 @@ public class BalanceSegments implements CoordinatorDuty final DruidCluster cluster = params.getDruidCluster(); final SegmentLoadingConfig loadingConfig = params.getSegmentLoadingConfig(); - final int maxSegmentsToMove = loadingConfig.getMaxSegmentsToMove(); + + final int maxSegmentsToMove = getMaxSegmentsToMove(params); if (maxSegmentsToMove <= 0) { log.info("Skipping balance as maxSegmentsToMove is [%d].", maxSegmentsToMove); return params; } else { log.info( - "Balancing segments in tiers [%s] with maxSegmentsToMove=[%d], maxLifetime=[%d].", + "Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and maxLifetime[%d].", cluster.getTierNames(), maxSegmentsToMove, loadingConfig.getMaxLifetimeInLoadQueue() ); } cluster.getHistoricals().forEach( - (tier, servers) -> new TierSegmentBalancer(tier, servers, params).run() + (tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run() ); CoordinatorRunStats runStats = params.getCoordinatorStats(); params.getBalancerStrategy() - .getAndResetStats() + .getStats() .forEachStat(runStats::add); return params; } + /** + * Recomputes the value of {@code maxSegmentsToMove} if smart segment loading + * is enabled. {@code maxSegmentsToMove} defines only the upper bound, the actual + * number of segments picked for moving is determined by the {@link TierSegmentBalancer} + * based on the level of skew in the tier. + */ + private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params) + { + final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); + if (dynamicConfig.isSmartSegmentLoading()) { + final Pair numHistoricalsAndSegments = getNumHistoricalsAndSegments(params.getDruidCluster()); + final int totalSegmentsInCluster = numHistoricalsAndSegments.rhs; + + final int numBalancerThreads = params.getSegmentLoadingConfig().getBalancerComputeThreads(); + final int maxSegmentsToMove = SegmentToMoveCalculator + .computeMaxSegmentsToMovePerTier(totalSegmentsInCluster, numBalancerThreads, coordinatorPeriod); + log.info( + "Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d] historicals.", + maxSegmentsToMove, totalSegmentsInCluster, numHistoricalsAndSegments.lhs + ); + + return maxSegmentsToMove; + } else { + return dynamicConfig.getMaxSegmentsToMove(); + } + } + + /** + * Calculates the total number of historicals (active and decommissioning) and + * the total number of segments on these historicals that would participate in + * cost computations. This includes all replicas of all loaded, loading, dropping + * and moving segments. + *

+ * This is calculated here to ensure that all assignments done by the preceding + * {@link RunRules} duty are accounted for. + */ + private Pair getNumHistoricalsAndSegments(DruidCluster cluster) + { + int numHistoricals = 0; + int numSegments = 0; + + for (Set historicals : cluster.getHistoricals().values()) { + for (ServerHolder historical : historicals) { + ++numHistoricals; + numSegments += historical.getServer().getNumSegments() + historical.getNumQueuedSegments(); + } + } + + return Pair.of(numHistoricals, numSegments); + } + } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java index 1c0ff573d19..8c5acaeebb6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java @@ -144,8 +144,8 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty final int numHistoricals = historicals.size(); log.info( - "Tier [%s] is serving [%,d], loading [%,d] and dropping [%,d] segments" - + " across [%d] historicals with average usage [%d GBs], [%.1f%%].", + "Tier[%s] is serving [%,d], loading [%,d] and dropping [%,d] segments" + + " across [%d] historicals with average usage[%d GBs], [%.1f%%].", tier, servedCount.get(), loadingCount.get(), droppingCount.get(), numHistoricals, (currentBytesSum.get() >> 30) / numHistoricals, usageSum.get() / numHistoricals ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index 8bdcee3641d..5a135d81cf4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -97,7 +97,7 @@ public class UnloadUnusedSegments implements CoordinatorDuty && loadQueueManager.dropSegment(segment, serverHolder)) { totalUnneededCount++; log.debug( - "Dropping uneeded segment[%s] from server[%s] in tier[%s]", + "Dropping uneeded segment[%s] from server[%s] in tier[%s].", segment.getId(), server.getName(), server.getTier() ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java index bb42e95eaae..f08b7ed5ca8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java @@ -35,33 +35,27 @@ public class SegmentLoadingConfig private final int maxReplicaAssignmentsInRun; private final int maxLifetimeInLoadQueue; - private final int maxSegmentsToMove; - private final int percentDecommSegmentsToMove; + private final int balancerComputeThreads; private final boolean useRoundRobinSegmentAssignment; /** - * Creates a new SegmentLoadingConfig with recomputed coordinator config values from + * Creates a new SegmentLoadingConfig with recomputed coordinator config values * based on whether {@link CoordinatorDynamicConfig#isSmartSegmentLoading()} * is enabled or not. */ public static SegmentLoadingConfig create(CoordinatorDynamicConfig dynamicConfig, int numUsedSegments) { if (dynamicConfig.isSmartSegmentLoading()) { - // Compute recommended values - // Impose a lower bound on both replicationThrottleLimit and maxSegmentsToMove + // Compute replicationThrottleLimit with a lower bound of 100 final int throttlePercentage = 2; final int replicationThrottleLimit = Math.max(100, numUsedSegments * throttlePercentage / 100); - - // Impose an upper bound on maxSegmentsToMove to ensure that coordinator - // run times are bounded. This limit can be relaxed as performance of - // the CostBalancerStrategy.computeCost() is improved. - final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit); + final int balancerComputeThreads = computeNumBalancerThreads(numUsedSegments); log.info( - "Smart segment loading is enabled. Recomputed replicationThrottleLimit" - + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%,d].", - replicationThrottleLimit, throttlePercentage, numUsedSegments, maxSegmentsToMove + "Smart segment loading is enabled. Calculated balancerComputeThreads[%d]" + + " and replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).", + balancerComputeThreads, replicationThrottleLimit, throttlePercentage, numUsedSegments ); return new SegmentLoadingConfig( @@ -69,9 +63,8 @@ public class SegmentLoadingConfig replicationThrottleLimit, Integer.MAX_VALUE, 60, - maxSegmentsToMove, - 100, - true + true, + balancerComputeThreads ); } else { // Use the configured values @@ -80,9 +73,8 @@ public class SegmentLoadingConfig dynamicConfig.getReplicationThrottleLimit(), dynamicConfig.getMaxNonPrimaryReplicantsToLoad(), dynamicConfig.getReplicantLifetime(), - dynamicConfig.getMaxSegmentsToMove(), - dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(), - dynamicConfig.isUseRoundRobinSegmentAssignment() + dynamicConfig.isUseRoundRobinSegmentAssignment(), + dynamicConfig.getBalancerComputeThreads() ); } } @@ -92,18 +84,16 @@ public class SegmentLoadingConfig int replicationThrottleLimit, int maxReplicaAssignmentsInRun, int maxLifetimeInLoadQueue, - int maxSegmentsToMove, - int percentDecommSegmentsToMove, - boolean useRoundRobinSegmentAssignment + boolean useRoundRobinSegmentAssignment, + int balancerComputeThreads ) { this.maxSegmentsInLoadQueue = maxSegmentsInLoadQueue; this.replicationThrottleLimit = replicationThrottleLimit; this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun; this.maxLifetimeInLoadQueue = maxLifetimeInLoadQueue; - this.maxSegmentsToMove = maxSegmentsToMove; - this.percentDecommSegmentsToMove = percentDecommSegmentsToMove; this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment; + this.balancerComputeThreads = balancerComputeThreads; } public int getMaxSegmentsInLoadQueue() @@ -111,11 +101,6 @@ public class SegmentLoadingConfig return maxSegmentsInLoadQueue; } - public int getMaxSegmentsToMove() - { - return maxSegmentsToMove; - } - public int getReplicationThrottleLimit() { return replicationThrottleLimit; @@ -136,8 +121,35 @@ public class SegmentLoadingConfig return maxReplicaAssignmentsInRun; } - public int getPercentDecommSegmentsToMove() + public int getBalancerComputeThreads() { - return percentDecommSegmentsToMove; + return balancerComputeThreads; + } + + /** + * Computes the number of threads to be used in the balancing executor. + * The number of used segments in a cluster is generally a good indicator of + * the cluster size and has been used here as a proxy for the actual number of + * segments that would be involved in cost computations. + *

+ * The number of threads increases by 1 first for every 50k segments, then for + * every 75k segments and so on. + * + * @return Number of {@code balancerComputeThreads} in the range [1, 8]. + */ + public static int computeNumBalancerThreads(int numUsedSegments) + { + // Add an extra thread when numUsedSegments increases by a step + final int[] stepValues = {50, 50, 75, 75, 100, 100, 150, 150}; + + int remainder = numUsedSegments / 1000; + for (int step = 0; step < stepValues.length; ++step) { + remainder -= stepValues[step]; + if (remainder < 0) { + return step + 1; + } + } + + return stepValues.length; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java index 6d0e1b8ae85..cbee9744524 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java @@ -255,6 +255,8 @@ public class CoordinatorRunStats { if (debugDimensions.isEmpty()) { return false; + } else if (rowKey.getValues().isEmpty()) { + return true; } for (Map.Entry entry : rowKey.getValues().entrySet()) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java index d1df9126989..63a0944ed7f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java @@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Before; @@ -142,7 +143,7 @@ public class BalanceSegmentsProfiler .withDatabaseRuleManager(manager) .build(); - BalanceSegments tester = new BalanceSegments(); + BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1)); RunRules runner = new RunRules(Set::size); watch.start(); DruidCoordinatorRuntimeParams balanceParams = tester.run(params); @@ -188,7 +189,7 @@ public class BalanceSegmentsProfiler .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build()) .withSegmentAssignerUsing(loadQueueManager) .build(); - BalanceSegments tester = new BalanceSegments(); + BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1)); watch.start(); DruidCoordinatorRuntimeParams balanceParams = tester.run(params); System.out.println(watch.stop()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index cf53708af00..0e75f95a1fb 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -51,6 +51,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; +import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; @@ -585,25 +586,12 @@ public class DruidCoordinatorTest extends CuratorTestBase @Test public void testBalancerThreadNumber() { - CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class); - EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2); - EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once(); - - JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); - EasyMock.expect( - configManager.watch( - EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), - EasyMock.anyObject(Class.class), - EasyMock.anyObject() - ) - ).andReturn(new AtomicReference<>(dynamicConfig)).anyTimes(); - ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); - EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory); + EasyMock.replay(scheduledExecutorFactory); DruidCoordinator c = new DruidCoordinator( druidCoordinatorConfig, - configManager, + EasyMock.createNiceMock(JacksonConfigManager.class), null, null, null, @@ -617,7 +605,7 @@ public class DruidCoordinatorTest extends CuratorTestBase null, null, new CoordinatorCustomDutyGroups(ImmutableSet.of()), - null, + new RandomBalancerStrategyFactory(), null, null, null @@ -628,20 +616,20 @@ public class DruidCoordinatorTest extends CuratorTestBase Assert.assertNull(c.getBalancerExec()); // first initialization - c.initBalancerExecutor(); + c.createBalancerStrategy(5); Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); ListeningExecutorService firstExec = c.getBalancerExec(); Assert.assertNotNull(firstExec); // second initialization, expect no changes as cachedBalancerThreadNumber is not changed - c.initBalancerExecutor(); + c.createBalancerStrategy(5); Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); ListeningExecutorService secondExec = c.getBalancerExec(); Assert.assertNotNull(secondExec); Assert.assertSame(firstExec, secondExec); // third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10 - c.initBalancerExecutor(); + c.createBalancerStrategy(10); Assert.assertEquals(10, c.getCachedBalancerThreadNumber()); ListeningExecutorService thirdExec = c.getBalancerExec(); Assert.assertNotNull(thirdExec); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java index 11e36dea183..70fb2dd0b6e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java @@ -193,8 +193,6 @@ public class ServerHolderTest ); Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0))); Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1))); - Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId())); - Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId())); Assert.assertFalse(h1.isLoadQueueFull()); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java index e4b3715960f..576dfbb03d7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java @@ -31,8 +31,6 @@ import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; -import org.apache.druid.server.coordinator.stats.Dimension; -import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import org.junit.After; @@ -317,7 +315,7 @@ public class CostBalancerStrategyTest } @Test - public void testGetAndResetStats() + public void testGetStats() { final ServerHolder serverA = new ServerHolder( createHistorical().toImmutableDruidServer(), @@ -332,20 +330,13 @@ public class CostBalancerStrategyTest // Verify that computation stats have been tracked strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, serverB)); - CoordinatorRunStats computeStats = strategy.getAndResetStats(); + CoordinatorRunStats computeStats = strategy.getStats(); - final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI) - .with(Dimension.DESCRIPTION, "LOAD") - .and(Dimension.TIER, "hot"); - Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT, rowKey)); + Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT)); - long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME, rowKey); + long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME); Assert.assertTrue(computeTime >= 0 && computeTime <= 100); Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS)); - - // Verify that stats have been reset - computeStats = strategy.getAndResetStats(); - Assert.assertEquals(0, computeStats.rowCount()); } @Test diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java new file mode 100644 index 00000000000..58c1cde4099 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.balancer; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class SegmentToMoveCalculatorTest +{ + + private static final Duration DEFAULT_COORDINATOR_PERIOD = Duration.standardMinutes(1); + + /** + * 100 days x 100 partitions = 10,000 segments. + */ + private static final List WIKI_SEGMENTS + = CreateDataSegments.ofDatasource("wiki") + .forIntervals(100, Granularities.DAY) + .withNumPartitions(100) + .eachOfSizeInMb(500); + + /** + * 10 days * 1 partitions = 10 segments. + */ + private static final List KOALA_SEGMENTS + = CreateDataSegments.ofDatasource("koala") + .forIntervals(10, Granularities.DAY) + .eachOfSizeInMb(500); + + private static final String TIER = "tier1"; + + @Test + public void testMaxSegmentsToMove1Thread() + { + Assert.assertEquals(0, computeMaxSegmentsToMove(0, 1)); + Assert.assertEquals(50, computeMaxSegmentsToMove(50, 1)); + Assert.assertEquals(100, computeMaxSegmentsToMove(100, 1)); + + Assert.assertEquals(100, computeMaxSegmentsToMove(512, 1)); + Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 1)); + Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 1)); + + Assert.assertEquals(1_900, computeMaxSegmentsToMove(10_000, 1)); + Assert.assertEquals(9_700, computeMaxSegmentsToMove(50_000, 1)); + Assert.assertEquals(19_500, computeMaxSegmentsToMove(100_000, 1)); + + Assert.assertEquals(10_000, computeMaxSegmentsToMove(200_000, 1)); + Assert.assertEquals(4_000, computeMaxSegmentsToMove(500_000, 1)); + Assert.assertEquals(2_000, computeMaxSegmentsToMove(1_000_000, 1)); + } + + @Test + public void testMaxSegmentsToMoveIncreasesWithCoordinatorPeriod() + { + Assert.assertEquals(5_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(30_000))); + Assert.assertEquals(10_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(60_000))); + Assert.assertEquals(15_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(90_000))); + Assert.assertEquals(20_000, computeMaxSegmentsToMoveInPeriod(200_000, Duration.millis(120_000))); + + Assert.assertEquals(2_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(30_000))); + Assert.assertEquals(4_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(60_000))); + Assert.assertEquals(6_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(90_000))); + Assert.assertEquals(8_000, computeMaxSegmentsToMoveInPeriod(500_000, Duration.millis(120_000))); + } + + @Test + public void testMaxSegmentsToMove8Threads() + { + Assert.assertEquals(0, computeMaxSegmentsToMove(0, 8)); + Assert.assertEquals(50, computeMaxSegmentsToMove(50, 8)); + Assert.assertEquals(100, computeMaxSegmentsToMove(100, 8)); + + Assert.assertEquals(100, computeMaxSegmentsToMove(512, 8)); + Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 8)); + Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 8)); + + Assert.assertEquals(33_000, computeMaxSegmentsToMove(500_000, 8)); + Assert.assertEquals(16_000, computeMaxSegmentsToMove(1_000_000, 8)); + Assert.assertEquals(8_000, computeMaxSegmentsToMove(2_000_000, 8)); + Assert.assertEquals(3_000, computeMaxSegmentsToMove(5_000_000, 8)); + Assert.assertEquals(1_000, computeMaxSegmentsToMove(10_000_000, 8)); + } + + @Test + public void testMaxSegmentsToMoveWithComputedNumThreads() + { + Assert.assertEquals(1_900, computeNumThreadsAndMaxToMove(10_000)); + Assert.assertEquals(9_700, computeNumThreadsAndMaxToMove(50_000)); + + Assert.assertEquals(19_500, computeNumThreadsAndMaxToMove(100_000)); + Assert.assertEquals(39_000, computeNumThreadsAndMaxToMove(200_000)); + Assert.assertEquals(29_000, computeNumThreadsAndMaxToMove(500_000)); + + Assert.assertEquals(16_000, computeNumThreadsAndMaxToMove(1_000_000)); + Assert.assertEquals(8_000, computeNumThreadsAndMaxToMove(2_000_000)); + Assert.assertEquals(1_000, computeNumThreadsAndMaxToMove(10_000_000)); + } + + @Test + public void testMinSegmentsToMove() + { + Assert.assertEquals(0, computeMinSegmentsToMove(0)); + Assert.assertEquals(50, computeMinSegmentsToMove(50)); + + Assert.assertEquals(100, computeMinSegmentsToMove(100)); + Assert.assertEquals(100, computeMinSegmentsToMove(1_000)); + + Assert.assertEquals(100, computeMinSegmentsToMove(20_000)); + Assert.assertEquals(100, computeMinSegmentsToMove(50_000)); + Assert.assertEquals(100, computeMinSegmentsToMove(100_000)); + Assert.assertEquals(300, computeMinSegmentsToMove(200_000)); + Assert.assertEquals(700, computeMinSegmentsToMove(500_000)); + Assert.assertEquals(1_500, computeMinSegmentsToMove(1_000_000)); + Assert.assertEquals(15_200, computeMinSegmentsToMove(10_000_000)); + } + + @Test + public void testMinSegmentsToMoveIncreasesInSteps() + { + Assert.assertEquals(100, computeMinSegmentsToMove(131_071)); + Assert.assertEquals(200, computeMinSegmentsToMove(131_072)); + + Assert.assertEquals(500, computeMinSegmentsToMove(393_215)); + Assert.assertEquals(600, computeMinSegmentsToMove(393_216)); + + Assert.assertEquals(900, computeMinSegmentsToMove(655_359)); + Assert.assertEquals(1000, computeMinSegmentsToMove(655_360)); + + Assert.assertEquals(9_900, computeMinSegmentsToMove(6_553_599)); + Assert.assertEquals(10_000, computeMinSegmentsToMove(6_553_600)); + } + + @Test + public void testMinSegmentsArePickedForMoveWhenNoSkew() + { + final List historicals = Arrays.asList( + createServer("A", WIKI_SEGMENTS), + createServer("B", WIKI_SEGMENTS) + ); + + final int minSegmentsToMove = SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(20_000); + Assert.assertEquals(100, minSegmentsToMove); + + final int segmentsToMoveToFixSkew = SegmentToMoveCalculator + .computeNumSegmentsToMoveToBalanceTier(TIER, historicals); + Assert.assertEquals(0, segmentsToMoveToFixSkew); + + // Find segmentsToMove with no limit on maxSegmentsToMove + final int segmentsToMove = SegmentToMoveCalculator + .computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE); + + Assert.assertEquals(minSegmentsToMove, segmentsToMove); + } + + @Test + public void testHalfSegmentsArePickedForMoveWhenFullSkew() + { + final List historicals = Arrays.asList( + createServer("A", WIKI_SEGMENTS), + createServer("B", Collections.emptyList()) + ); + + final int minSegmentsToMove = SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(10_000); + Assert.assertEquals(100, minSegmentsToMove); + + final int segmentsToMoveToFixSkew = SegmentToMoveCalculator + .computeNumSegmentsToMoveToBalanceTier(TIER, historicals); + Assert.assertEquals(5_000, segmentsToMoveToFixSkew); + + // Find segmentsToMove with no limit on maxSegmentsToMove + final int segmentsToMove = SegmentToMoveCalculator + .computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE); + + Assert.assertEquals(segmentsToMoveToFixSkew, segmentsToMove); + } + + @Test + public void testDatasourceWithLargestGapDeterminesNumToBalanceCounts() + { + // Both servers have all koala segments but only A has wiki segments + List segmentsForServerA = new ArrayList<>(WIKI_SEGMENTS); + segmentsForServerA.addAll(KOALA_SEGMENTS); + + final List historicals = Arrays.asList( + createServer("A", segmentsForServerA), + createServer("B", KOALA_SEGMENTS) + ); + + // Verify that half the wiki segments need to be moved for balance + int numToMoveToBalanceCount = SegmentToMoveCalculator + .computeSegmentsToMoveToBalanceCountsPerDatasource(TIER, historicals); + Assert.assertEquals(WIKI_SEGMENTS.size() / 2, numToMoveToBalanceCount); + } + + private static int computeMaxSegmentsToMove(int totalSegments, int numThreads) + { + return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier( + totalSegments, + numThreads, + DEFAULT_COORDINATOR_PERIOD + ); + } + + private static int computeMaxSegmentsToMoveInPeriod(int totalSegments, Duration coordinatorPeriod) + { + return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegments, 1, coordinatorPeriod); + } + + private static int computeNumThreadsAndMaxToMove(int totalSegments) + { + return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier( + totalSegments, + SegmentLoadingConfig.computeNumBalancerThreads(totalSegments), + DEFAULT_COORDINATOR_PERIOD + ); + } + + private static int computeMinSegmentsToMove(int totalSegmentsInTier) + { + return SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(totalSegmentsInTier); + } + + private static ServerHolder createServer(String name, List segments) + { + final DruidServer server + = new DruidServer(name, name, null, 10L << 30, ServerType.HISTORICAL, "tier1", 1); + segments.forEach(server::addDataSegment); + return new ServerHolder(server.toImmutableDruidServer(), new TestLoadQueuePeon()); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java index c608f58d9a4..aada7b3214b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java @@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -378,7 +379,7 @@ public class BalanceSegmentsTest private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params) { - params = new BalanceSegments().run(params); + params = new BalanceSegments(Duration.standardMinutes(1)).run(params); if (params == null) { Assert.fail("BalanceSegments duty returned null params"); return new CoordinatorRunStats(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java new file mode 100644 index 00000000000..947ac45ca7f --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import org.junit.Assert; +import org.junit.Test; + +public class SegmentLoadingConfigTest +{ + + @Test + public void testComputeNumBalancerThreads() + { + Assert.assertEquals(1, computeBalancerThreads(0)); + Assert.assertEquals(1, computeBalancerThreads(30_000)); + Assert.assertEquals(2, computeBalancerThreads(50_000)); + Assert.assertEquals(3, computeBalancerThreads(100_000)); + + Assert.assertEquals(4, computeBalancerThreads(175_000)); + Assert.assertEquals(5, computeBalancerThreads(250_000)); + Assert.assertEquals(6, computeBalancerThreads(350_000)); + Assert.assertEquals(7, computeBalancerThreads(450_000)); + Assert.assertEquals(8, computeBalancerThreads(600_000)); + + Assert.assertEquals(8, computeBalancerThreads(1_000_000)); + Assert.assertEquals(8, computeBalancerThreads(10_000_000)); + } + + private int computeBalancerThreads(int numUsedSegments) + { + return SegmentLoadingConfig.computeNumBalancerThreads(numUsedSegments); + } + +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java index 984fe217afb..bd1af4a084a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java @@ -217,8 +217,8 @@ public abstract class CoordinatorSimulationBaseTest implements { static final String ASSIGNED_COUNT = "segment/assigned/count"; static final String MOVED_COUNT = "segment/moved/count"; + static final String MOVE_SKIPPED = "segment/moveSkipped/count"; static final String DROPPED_COUNT = "segment/dropped/count"; - static final String OVERSHADOWED_COUNT = "segment/overshadowed/count"; static final String DELETED_COUNT = "segment/deleted/count"; static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count"; static final String DROP_QUEUE_COUNT = "segment/dropQueue/count"; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index c5d5d94293f..e40bf8ee168 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -74,7 +74,6 @@ import java.util.concurrent.atomic.AtomicReference; */ public class CoordinatorSimulationBuilder { - private static final long DEFAULT_COORDINATOR_PERIOD = 100L; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper() .setInjectableValues( new InjectableValues.Std().addValue( @@ -463,8 +462,8 @@ public class CoordinatorSimulationBuilder this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withCoordinatorStartDelay(new Duration(1L)) - .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD)) - .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD)) + .withCoordinatorPeriod(Duration.standardMinutes(1)) + .withCoordinatorKillPeriod(Duration.millis(100)) .withLoadQueuePeonType("http") .withCoordinatorKillIgnoreDurationToRetain(false) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java index 383b5fe3c64..141c6ba7a11 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java @@ -26,6 +26,8 @@ import org.junit.Assert; import org.junit.Test; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Coordinator simulation test to verify behaviour of segment balancing. @@ -229,4 +231,39 @@ public class SegmentBalancingTest extends CoordinatorSimulationBaseTest Assert.assertTrue(getValue(Metric.MOVED_COUNT, null).intValue() > 0); } + @Test(timeout = 60000L) + public void testMaxSegmentsAreMovedWhenClusterIsSkewed() + { + // Add 10 historicals of size 1 TB each + final long size1TB = 1_000_000; + List historicals + = IntStream.range(0, 10) + .mapToObj(i -> createHistorical(i + 1, Tier.T1, size1TB)) + .collect(Collectors.toList()); + + CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.KOALA_100X100D) + .withServers(historicals) + .withRules(DS.KOALA, Load.on(Tier.T1, 1).forever()) + .build(); + + startSimulation(sim); + + // Run 1: All segments are assigned to the 10 historicals + runCoordinatorCycle(); + verifyValue(Metric.ASSIGNED_COUNT, 10_000L); + verifyNotEmitted(Metric.MOVED_COUNT); + verifyValue(Metric.MOVE_SKIPPED, 100L); + + // Run 2: Add 10 more historicals, some segments are moved to them + for (int i = 11; i <= 20; ++i) { + addServer(createHistorical(i, Tier.T1, size1TB)); + } + + runCoordinatorCycle(); + verifyValue(Metric.MOVED_COUNT, 500L); + verifyNotEmitted(Metric.MOVE_SKIPPED); + } + }