diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 0c5e79bdcbe..c9d334968ef 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -98,6 +99,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -672,7 +674,7 @@ public class DruidCoordinator { try { log.info("Starting coordinator run for group [%s]", dutyGroupName); - final long globalStart = System.currentTimeMillis(); + final Stopwatch groupRunTime = Stopwatch.createStarted(); synchronized (lock) { if (!coordLeaderSelector.isLeader()) { @@ -719,23 +721,25 @@ public class DruidCoordinator log.info("Coordination has been paused. Duties will not run until coordination is resumed."); } + final Stopwatch dutyRunTime = Stopwatch.createUnstarted(); for (CoordinatorDuty duty : duties) { // Don't read state and run state in the same duty otherwise racy conditions may exist if (!coordinationPaused && coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { - final long start = System.currentTimeMillis(); + dutyRunTime.reset().start(); params = duty.run(params); - final long end = System.currentTimeMillis(); + dutyRunTime.stop(); final String dutyName = duty.getClass().getName(); if (params == null) { log.info("Stopping run for group [%s] on request of duty [%s].", dutyGroupName, dutyName); return; } else { - final RowKey rowKey = RowKey.builder().add(Dimension.DUTY, dutyName).build(); - params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, end - start); + final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName); + final long dutyRunMillis = dutyRunTime.elapsed(TimeUnit.MILLISECONDS); + params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis); } } } @@ -745,9 +749,9 @@ public class DruidCoordinator if (allStats.rowCount() > 0) { final AtomicInteger emittedCount = new AtomicInteger(); allStats.forEachStat( - (dimensionValues, stat, value) -> { + (stat, dimensions, value) -> { if (stat.shouldEmit()) { - emitStat(stat, dimensionValues, value); + emitStat(stat, dimensions.getValues(), value); emittedCount.incrementAndGet(); } } @@ -760,7 +764,7 @@ public class DruidCoordinator } // Emit the runtime of the full DutiesRunnable - final long runMillis = System.currentTimeMillis() - globalStart; + final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS); emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis); log.info("Finished coordinator run for group [%s] in [%d] ms", dutyGroupName, runMillis); } @@ -771,10 +775,6 @@ public class DruidCoordinator private void emitStat(CoordinatorStat stat, Map dimensionValues, long value) { - if (stat.equals(Stats.Balancer.NORMALIZED_COST_X_1000)) { - value = value / 1000; - } - ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder() .setDimension(Dimension.DUTY_GROUP.reportedName(), dutyGroupName); dimensionValues.forEach( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java new file mode 100644 index 00000000000..ea1f81ee6d4 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntMaps; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.HashMap; +import java.util.Map; + +/** + * Maintains a count of segments for each datasource and interval. + */ +public class SegmentCountsPerInterval +{ + private final Map> + datasourceIntervalToSegmentCount = new HashMap<>(); + private final Object2IntMap intervalToTotalSegmentCount = new Object2IntOpenHashMap<>(); + + public void addSegment(DataSegment segment) + { + updateCountInInterval(segment, 1); + } + + public void removeSegment(DataSegment segment) + { + updateCountInInterval(segment, -1); + } + + public Object2IntMap getIntervalToSegmentCount(String datasource) + { + return datasourceIntervalToSegmentCount.getOrDefault(datasource, Object2IntMaps.emptyMap()); + } + + public Object2IntMap getIntervalToTotalSegmentCount() + { + return intervalToTotalSegmentCount; + } + + private void updateCountInInterval(DataSegment segment, int delta) + { + intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, Integer::sum); + datasourceIntervalToSegmentCount + .computeIfAbsent(segment.getDataSource(), ds -> new Object2IntOpenHashMap<>()) + .mergeInt(segment.getInterval(), delta, Integer::sum); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index 79f2c38ab20..7947f1c1b32 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -32,11 +32,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** @@ -75,11 +73,7 @@ public class ServerHolder implements Comparable */ private final Map queuedSegments = new HashMap<>(); - /** - * Segments that are expected to be loaded on this server once all the - * operations in progress have completed. - */ - private final Set projectedSegments = new HashSet<>(); + private final SegmentCountsPerInterval projectedSegments = new SegmentCountsPerInterval(); public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon) { @@ -133,31 +127,31 @@ public class ServerHolder implements Comparable AtomicInteger loadingReplicaCount ) { - projectedSegments.addAll(server.iterateAllSegments()); + for (DataSegment segment : server.iterateAllSegments()) { + projectedSegments.addSegment(segment); + } final List expiredSegments = new ArrayList<>(); - peon.getSegmentsInQueue().forEach( - (holder) -> { - int runsInQueue = holder.incrementAndGetRunsInQueue(); - if (runsInQueue > maxLifetimeInQueue) { - expiredSegments.add(holder); - } + for (SegmentHolder holder : peon.getSegmentsInQueue()) { + int runsInQueue = holder.incrementAndGetRunsInQueue(); + if (runsInQueue > maxLifetimeInQueue) { + expiredSegments.add(holder); + } - final SegmentAction action = holder.getAction(); - addToQueuedSegments(holder.getSegment(), simplify(action)); + final SegmentAction action = holder.getAction(); + addToQueuedSegments(holder.getSegment(), simplify(action)); - if (action == SegmentAction.MOVE_TO) { - movingSegmentCount.incrementAndGet(); - } - if (action == SegmentAction.REPLICATE) { - loadingReplicaCount.incrementAndGet(); - } - } - ); + if (action == SegmentAction.MOVE_TO) { + movingSegmentCount.incrementAndGet(); + } + if (action == SegmentAction.REPLICATE) { + loadingReplicaCount.incrementAndGet(); + } + } - peon.getSegmentsMarkedToDrop().forEach( - segment -> addToQueuedSegments(segment, SegmentAction.MOVE_FROM) - ); + for (DataSegment segment : peon.getSegmentsMarkedToDrop()) { + addToQueuedSegments(segment, SegmentAction.MOVE_FROM); + } if (!expiredSegments.isEmpty()) { List expiredSegmentsSubList = @@ -251,11 +245,21 @@ public class ServerHolder implements Comparable * Segments that are expected to be loaded on this server once all the * operations in progress have completed. */ - public Set getProjectedSegments() + public SegmentCountsPerInterval getProjectedSegments() { return projectedSegments; } + public boolean isProjectedSegment(DataSegment segment) + { + SegmentAction action = getActionOnSegment(segment); + if (action == null) { + return hasSegmentLoaded(segment.getId()); + } else { + return action.isLoad(); + } + } + /** * Segments that are currently in the queue for being loaded on this server. * This does not include segments that are being moved to this server. @@ -362,10 +366,10 @@ public class ServerHolder implements Comparable // Add to projected if load is started, remove from projected if drop has started if (action.isLoad()) { - projectedSegments.add(segment); + projectedSegments.addSegment(segment); sizeOfLoadingSegments += segment.getSize(); } else { - projectedSegments.remove(segment); + projectedSegments.removeSegment(segment); if (action == SegmentAction.DROP) { sizeOfDroppingSegments += segment.getSize(); } @@ -379,10 +383,10 @@ public class ServerHolder implements Comparable queuedSegments.remove(segment); if (action.isLoad()) { - projectedSegments.remove(segment); + projectedSegments.removeSegment(segment); sizeOfLoadingSegments -= segment.getSize(); } else { - projectedSegments.add(segment); + projectedSegments.addSegment(segment); if (action == SegmentAction.DROP) { sizeOfDroppingSegments -= segment.getSize(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java index 7f27648d3f8..e133430b1b2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java @@ -20,70 +20,60 @@ package org.apache.druid.server.coordinator.balancer; import org.apache.druid.server.coordinator.ServerHolder; -import org.apache.druid.server.coordinator.duty.BalanceSegments; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; -import java.util.NavigableSet; /** - * This interface describes the coordinator balancing strategy, which is responsible for making decisions on where - * to place {@link DataSegment}s on historical servers (described by {@link ServerHolder}). The balancing strategy - * is used by {@link org.apache.druid.server.coordinator.rules.LoadRule} to assign and drop segments, and by - * {@link BalanceSegments} to migrate segments between historicals. + * Segment balancing strategy, used in every coordinator run by + * {@link org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner} + * to choose optimal servers to load, move or drop a segment. */ public interface BalancerStrategy { /** - * Finds the best server to move a segment to according to the balancing strategy. + * Finds the best server from the list of {@code destinationServers} to load + * the {@code segmentToMove}, if it is moved from the {@code sourceServer}. + *

+ * In order to avoid unnecessary moves when the segment is already optimally placed, + * include the {@code sourceServer} in the list of {@code destinationServers}. * - * @param proposalSegment segment to move - * @param sourceServer Server the segment is currently placed on. - * @param destinationServers servers to consider as move destinations - * @return The server to move to, or null if no move should be made or no server is suitable + * @return The server to move to, or null if the segment is already optimally placed. */ @Nullable ServerHolder findDestinationServerToMoveSegment( - DataSegment proposalSegment, + DataSegment segmentToMove, ServerHolder sourceServer, List destinationServers ); /** - * Finds the best servers on which to place the {@code proposalSegment}. - * This method can be used both for placing the first copy of a segment - * in the tier or a replica of the segment. + * Finds the best servers to load the given segment. This method can be used + * both for placing the first copy of a segment in a tier or a replica of an + * already available segment. * - * @param proposalSegment segment to place on servers - * @param serverHolders servers to consider as segment homes - * @return Iterator over the best servers (in order) on which the segment - * can be placed. + * @return Iterator over the best servers (in order of preference) to load + * the segment. */ Iterator findServersToLoadSegment( - DataSegment proposalSegment, + DataSegment segmentToLoad, List serverHolders ); /** - * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first - * for a given drop strategy. One or more segments may be dropped, depending on how much the segment is - * over-replicated. - * @param toDropSegment segment to drop from one or more servers - * @param serverHolders set of historicals to consider dropping from - * @return Iterator for set of historicals, ordered by drop preference + * Finds the best servers to drop the given segment. + * + * @return Iterator over the servers (in order of preference) to drop the segment */ - Iterator pickServersToDropSegment(DataSegment toDropSegment, NavigableSet serverHolders); + Iterator findServersToDropSegment(DataSegment segmentToDrop, List serverHolders); /** - * Add balancing strategy stats during the 'balanceTier' operation of - * {@link BalanceSegments} to be included - * @param tier historical tier being balanced - * @param stats stats object to add balancing strategy stats to - * @param serverHolderList servers in tier being balanced + * Returns the stats collected by the strategy in the current run and resets + * the stats collector for the next run. */ - void emitStats(String tier, CoordinatorRunStats stats, List serverHolderList); + CoordinatorRunStats getAndResetStats(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java index 424657991cf..eda99289157 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java @@ -28,10 +28,8 @@ import org.apache.druid.timeline.DataSegment; import java.util.Collections; import java.util.Set; - public class CachingCostBalancerStrategy extends CostBalancerStrategy { - private final ClusterCostCache clusterCostCache; public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache, ListeningExecutorService exec) @@ -41,13 +39,8 @@ public class CachingCostBalancerStrategy extends CostBalancerStrategy } @Override - protected double computeCost(DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer) + protected double computePlacementCost(DataSegment proposalSegment, ServerHolder server) { - // (optional) Don't include server if it cannot load the segment - if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) { - return Double.POSITIVE_INFINITY; - } - final String serverName = server.getServer().getName(); double cost = clusterCostCache.computeCost(serverName, proposalSegment); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java index 726dffe6904..0ddacaead70 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java @@ -125,6 +125,10 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto @Override public BalancerStrategy createBalancerStrategy(final ListeningExecutorService exec) { + LOG.warn( + "'cachingCost' balancer strategy has been deprecated as it can lead to" + + " unbalanced clusters. Use 'cost' strategy instead." + ); if (!isInitialized() && config.isAwaitInitialization()) { try { final long startMillis = System.currentTimeMillis(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java index 6d165c1f349..aae907bb599 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java @@ -19,16 +19,21 @@ package org.apache.druid.server.coordinator.balancer; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.commons.math3.util.FastMath; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.server.coordinator.SegmentCountsPerInterval; import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.loading.SegmentAction; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; @@ -38,11 +43,10 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.NavigableSet; import java.util.PriorityQueue; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; public class CostBalancerStrategy implements BalancerStrategy { @@ -63,6 +67,20 @@ public class CostBalancerStrategy implements BalancerStrategy = Comparator., Double>comparing(pair -> pair.lhs) .thenComparing(pair -> pair.rhs); + private final CoordinatorRunStats stats = new CoordinatorRunStats(); + + public static double computeJointSegmentsCost(DataSegment segment, Iterable segmentSet) + { + final Interval costComputeInterval = getCostComputeInterval(segment); + double totalCost = 0; + for (DataSegment s : segmentSet) { + if (costComputeInterval.overlaps(s.getInterval())) { + totalCost += computeJointSegmentsCost(segment, s); + } + } + return totalCost; + } + /** * This defines the unnormalized cost function between two segments. * @@ -83,15 +101,20 @@ public class CostBalancerStrategy implements BalancerStrategy final Interval intervalA = segmentA.getInterval(); final Interval intervalB = segmentB.getInterval(); + // constant cost-multiplier for segments of the same datsource + final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) + ? 2.0 : 1.0; + return intervalCost(intervalA, intervalB) * multiplier; + } + + public static double intervalCost(Interval intervalA, Interval intervalB) + { final double t0 = intervalA.getStartMillis(); final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR; final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR; final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR; - // constant cost-multiplier for segments of the same datsource - final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0; - - return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier; + return INV_LAMBDA_SQUARE * intervalCost(t1, start, end); } /** @@ -199,161 +222,119 @@ public class CostBalancerStrategy implements BalancerStrategy @Override public Iterator findServersToLoadSegment( - DataSegment proposalSegment, + DataSegment segmentToLoad, List serverHolders ) { - return getServersByPlacementCost(proposalSegment, serverHolders, false, "findServersToLoadSegment"); + return orderServersByPlacementCost(segmentToLoad, serverHolders, SegmentAction.LOAD) + .stream() + .filter(server -> server.canLoadSegment(segmentToLoad)) + .iterator(); } - @Override public ServerHolder findDestinationServerToMoveSegment( - DataSegment proposalSegment, + DataSegment segmentToMove, ServerHolder sourceServer, List serverHolders ) { - Iterator servers = - getServersByPlacementCost(proposalSegment, serverHolders, true, "findServerToMoveSegment"); - return servers.hasNext() ? servers.next() : null; - } - - public static double computeJointSegmentsCost(DataSegment segment, Iterable segmentSet) - { - double totalCost = 0; - for (DataSegment s : segmentSet) { - totalCost += computeJointSegmentsCost(segment, s); + List servers = + orderServersByPlacementCost(segmentToMove, serverHolders, SegmentAction.MOVE_TO); + if (servers.isEmpty()) { + return null; } - return totalCost; + + ServerHolder candidateServer = servers.get(0); + return candidateServer.equals(sourceServer) ? null : candidateServer; } @Override - public Iterator pickServersToDropSegment( + public Iterator findServersToDropSegment( DataSegment segmentToDrop, - NavigableSet serverHolders + List serverHolders ) { - List serversByCost = Lists.newArrayList( - getServersByPlacementCost(segmentToDrop, serverHolders, true, "pickServersToDropSegment") - ); + List serversByCost = + orderServersByPlacementCost(segmentToDrop, serverHolders, SegmentAction.DROP); // Prioritize drop from highest cost servers return Lists.reverse(serversByCost).iterator(); } - /** - * Calculates the initial cost of the Druid segment configuration. - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The initial cost of the Druid tier. - */ - public double calculateInitialTotalCost(final List serverHolders) - { - double cost = 0; - for (ServerHolder server : serverHolders) { - // segments are dumped into an array because it's probably better than iterating the iterateAllSegments() result - // quadratically in a loop, which can generate garbage in the form of Stream, Spliterator, Iterator, etc. objects - // whose total memory volume exceeds the size of the DataSegment array. - DataSegment[] segments = server.getServer().iterateAllSegments().toArray(new DataSegment[0]); - for (DataSegment s1 : segments) { - for (DataSegment s2 : segments) { - cost += computeJointSegmentsCost(s1, s2); - } - } - } - return cost; - } - - /** - * Calculates the cost normalization. This is such that the normalized cost is lower bounded - * by 1 (e.g. when each segment gets its own historical node). - * - * @param serverHolders A list of ServerHolders for a particular tier. - * - * @return The normalization value (the sum of the diagonal entries in the - * pairwise cost matrix). This is the cost of a cluster if each - * segment were to get its own historical node. - */ - public double calculateNormalization(final List serverHolders) - { - double cost = 0; - for (ServerHolder server : serverHolders) { - for (DataSegment segment : server.getServedSegments()) { - cost += computeJointSegmentsCost(segment, segment); - } - } - return cost; - } - @Override - public void emitStats(String tier, CoordinatorRunStats stats, List serverHolderList) + public CoordinatorRunStats getAndResetStats() { - final double initialTotalCost = calculateInitialTotalCost(serverHolderList); - final double normalization = calculateNormalization(serverHolderList); - final double normalizedInitialCost = initialTotalCost / normalization; - - final RowKey rowKey = RowKey.forTier(tier); - stats.add(Stats.Balancer.RAW_COST, rowKey, (long) initialTotalCost); - stats.add(Stats.Balancer.NORMALIZATION_COST, rowKey, (long) normalization); - stats.add(Stats.Balancer.NORMALIZED_COST_X_1000, rowKey, (long) (normalizedInitialCost * 1000)); - - log.info( - "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]", - tier, initialTotalCost, normalization, normalizedInitialCost - ); + return stats.getSnapshotAndReset(); } - protected double computeCost( - final DataSegment proposalSegment, - final ServerHolder server, - final boolean includeCurrentServer - ) + /** + * Computes the cost of placing a segment on this server. + */ + protected double computePlacementCost(DataSegment proposalSegment, ServerHolder server) { - // (optional) Don't include server if it cannot load the segment - if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) { - return Double.POSITIVE_INFINITY; - } + final Interval costComputeInterval = getCostComputeInterval(proposalSegment); - // The contribution to the total cost of a given server by proposing to move the segment to that server is... - double cost = 0d; + // Compute number of segments in each interval + final Object2IntOpenHashMap intervalToSegmentCount = new Object2IntOpenHashMap<>(); - // the sum of the costs of segments expected to be on the server (loaded + loading - dropping) - Set projectedSegments = server.getProjectedSegments(); - cost += computeJointSegmentsCost(proposalSegment, projectedSegments); + final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments(); + projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry -> { + final Interval interval = entry.getKey(); + if (costComputeInterval.overlaps(interval)) { + intervalToSegmentCount.addTo(interval, entry.getIntValue()); + } + }); - // minus the self cost of the segment - if (projectedSegments.contains(proposalSegment)) { - cost -= computeJointSegmentsCost(proposalSegment, proposalSegment); + // Count the segments for the same datasource twice as they have twice the cost + final String datasource = proposalSegment.getDataSource(); + projectedSegments.getIntervalToSegmentCount(datasource).object2IntEntrySet().forEach(entry -> { + final Interval interval = entry.getKey(); + if (costComputeInterval.overlaps(interval)) { + intervalToSegmentCount.addTo(interval, entry.getIntValue()); + } + }); + + // Compute joint cost for each interval + double cost = 0; + final Interval segmentInterval = proposalSegment.getInterval(); + cost += intervalToSegmentCount.object2IntEntrySet().stream().mapToDouble( + entry -> intervalCost(segmentInterval, entry.getKey()) + * entry.getIntValue() + ).sum(); + + // Minus the self cost of the segment + if (server.isProjectedSegment(proposalSegment)) { + cost -= intervalCost(segmentInterval, segmentInterval) * 2.0; } return cost; } /** - * Returns an iterator over the servers, ordered by increasing cost for - * placing the given segment on that server. - * - * @param includeCurrentServer true if the server already serving a replica - * of this segment should be included in the results + * Orders the servers by increasing cost for placing the given segment. */ - private Iterator getServersByPlacementCost( - DataSegment proposalSegment, - Iterable serverHolders, - boolean includeCurrentServer, - String action + private List orderServersByPlacementCost( + DataSegment segment, + List serverHolders, + SegmentAction action ) { + final Stopwatch computeTime = Stopwatch.createStarted(); final List>> futures = new ArrayList<>(); for (ServerHolder server : serverHolders) { futures.add( exec.submit( - () -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server) + () -> Pair.of(computePlacementCost(segment, server), server) ) ); } + String tier = serverHolders.isEmpty() ? null : serverHolders.get(0).getServer().getTier(); + final RowKey metricKey = RowKey.with(Dimension.TIER, tier) + .with(Dimension.DATASOURCE, segment.getDataSource()) + .and(Dimension.DESCRIPTION, action.name()); + final PriorityQueue> costPrioritizedServers = new PriorityQueue<>(CHEAPEST_SERVERS_FIRST); try { @@ -364,30 +345,57 @@ public class CostBalancerStrategy implements BalancerStrategy ); } catch (Exception e) { - alertOnFailure(e, action); + stats.add(Stats.Balancer.COMPUTATION_ERRORS, metricKey, 1); + handleFailure(e, segment, action); } - // Include current server only if specified - return costPrioritizedServers.stream() - .filter(pair -> includeCurrentServer || pair.rhs.canLoadSegment(proposalSegment)) - .map(pair -> pair.rhs).iterator(); + // Report computation stats + computeTime.stop(); + stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1); + stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey, computeTime.elapsed(TimeUnit.MILLISECONDS)); + + return costPrioritizedServers.stream().map(pair -> pair.rhs) + .collect(Collectors.toList()); } - private void alertOnFailure(Exception e, String action) + private void handleFailure( + Exception e, + DataSegment segment, + SegmentAction action + ) { - // Do not alert if the executor has been shutdown + final String reason; + String suggestion = ""; if (exec.isShutdown()) { - log.noStackTrace().info("Balancer executor was terminated. Failing action [%s].", action); - return; + reason = "Executor shutdown"; + } else if (e instanceof TimeoutException) { + reason = "Timed out"; + suggestion = " Try setting a higher value for 'balancerComputeThreads'."; + } else { + reason = e.getMessage(); } - final boolean hasTimedOut = e instanceof TimeoutException; - final String message = StringUtils.format( - "Cost balancer strategy %s in action [%s].%s", - hasTimedOut ? "timed out" : "failed", action, - hasTimedOut ? " Try setting a higher value of 'balancerComputeThreads'." : "" - ); - log.makeAlert(e, message).emit(); + String msgFormat = "Cost strategy computations failed for action[%s] on segment[%s] due to reason[%s].[%s]"; + log.noStackTrace().warn(e, msgFormat, action, segment.getId(), reason, suggestion); + } + + /** + * The cost compute interval for a segment is {@code [start-45days, end+45days)}. + * This is because the joint cost of any two segments that are 45 days apart is + * negligible. + */ + private static Interval getCostComputeInterval(DataSegment segment) + { + final Interval segmentInterval = segment.getInterval(); + if (Intervals.isEternity(segmentInterval)) { + return segmentInterval; + } else { + final long maxGap = TimeUnit.DAYS.toMillis(45); + return Intervals.utc( + segmentInterval.getStartMillis() - maxGap, + segmentInterval.getEndMillis() + maxGap + ); + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java index cee292930cf..601e5b042e0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java @@ -35,13 +35,12 @@ public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy * This ensures that all the hosts will have the same % disk utilization. */ @Override - protected double computeCost( + protected double computePlacementCost( final DataSegment proposalSegment, - final ServerHolder server, - final boolean includeCurrentServer + final ServerHolder server ) { - double cost = super.computeCost(proposalSegment, server, includeCurrentServer); + double cost = super.computePlacementCost(proposalSegment, server); if (cost == Double.POSITIVE_INFINITY) { return cost; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java index 98b1d8bdc70..cccc2518e83 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.NavigableSet; import java.util.stream.Collectors; /** @@ -39,16 +38,18 @@ import java.util.stream.Collectors; */ public class RandomBalancerStrategy implements BalancerStrategy { + private static final CoordinatorRunStats EMPTY_STATS = new CoordinatorRunStats(); + @Override public Iterator findServersToLoadSegment( - DataSegment proposalSegment, + DataSegment segmentToLoad, List serverHolders ) { // Filter out servers which cannot load this segment final List usableServerHolders = serverHolders.stream() - .filter(server -> server.canLoadSegment(proposalSegment)) + .filter(server -> server.canLoadSegment(segmentToLoad)) .collect(Collectors.toList()); Collections.shuffle(usableServerHolders); return usableServerHolders.iterator(); @@ -56,7 +57,7 @@ public class RandomBalancerStrategy implements BalancerStrategy @Override public ServerHolder findDestinationServerToMoveSegment( - DataSegment proposalSegment, + DataSegment segmentToMove, ServerHolder sourceServer, List serverHolders ) @@ -66,7 +67,7 @@ public class RandomBalancerStrategy implements BalancerStrategy } @Override - public Iterator pickServersToDropSegment(DataSegment toDropSegment, NavigableSet serverHolders) + public Iterator findServersToDropSegment(DataSegment segmentToDrop, List serverHolders) { List serverList = new ArrayList<>(serverHolders); Collections.shuffle(serverList); @@ -74,7 +75,8 @@ public class RandomBalancerStrategy implements BalancerStrategy } @Override - public void emitStats(String tier, CoordinatorRunStats stats, List serverHolderList) + public CoordinatorRunStats getAndResetStats() { + return EMPTY_STATS; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java index a5ab8d0d477..86e5b996293 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordinator.balancer; -import com.google.common.collect.Lists; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -56,11 +55,9 @@ public class TierSegmentBalancer private final DruidCoordinatorRuntimeParams params; private final StrategicSegmentAssigner segmentAssigner; - private final BalancerStrategy strategy; private final SegmentLoadingConfig loadingConfig; private final CoordinatorRunStats runStats; - private final Set allServers; private final List activeServers; private final List decommissioningServers; private final int totalMaxSegmentsToMove; @@ -77,7 +74,6 @@ public class TierSegmentBalancer this.params = params; this.segmentAssigner = params.getSegmentAssigner(); - this.strategy = params.getBalancerStrategy(); this.loadingConfig = params.getSegmentLoadingConfig(); this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove(); this.runStats = segmentAssigner.getStats(); @@ -86,7 +82,6 @@ public class TierSegmentBalancer servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning)); this.decommissioningServers = partitions.get(true); this.activeServers = partitions.get(false); - this.allServers = servers; this.movingSegmentCount = activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum(); } @@ -128,10 +123,6 @@ public class TierSegmentBalancer "Moved [%d] segments out of max [%d] between active servers in tier [%s].", movedGeneralSegments, maxGeneralSegmentsToMove, tier ); - - if (loadingConfig.isEmitBalancingStats()) { - strategy.emitStats(tier, runStats, Lists.newArrayList(allServers)); - } } private int moveSegmentsFromTo( @@ -224,13 +215,9 @@ public class TierSegmentBalancer private void markUnmoved(String reason, DataSegment segment) { - final RowKey key - = RowKey.builder() - .add(Dimension.TIER, tier) - .add(Dimension.DATASOURCE, segment.getDataSource()) - .add(Dimension.DESCRIPTION, reason) - .build(); - + RowKey key = RowKey.with(Dimension.TIER, tier) + .with(Dimension.DATASOURCE, segment.getDataSource()) + .and(Dimension.DESCRIPTION, reason); runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index 865fde2a0bc..4ff2e657438 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -24,6 +24,7 @@ import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer; import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; /** * @@ -57,6 +58,11 @@ public class BalanceSegments implements CoordinatorDuty (tier, servers) -> new TierSegmentBalancer(tier, servers, params).run() ); + CoordinatorRunStats runStats = params.getCoordinatorStats(); + params.getBalancerStrategy() + .getAndResetStats() + .forEachStat(runStats::add); + return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java index b5825cb084f..5109ec3c3f2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java @@ -72,7 +72,7 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty final DruidCluster cluster = params.getDruidCluster(); cluster.getHistoricals().forEach((tier, historicals) -> { - final RowKey rowKey = RowKey.forTier(tier); + final RowKey rowKey = RowKey.of(Dimension.TIER, tier); stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size()); long totalCapacity = historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum); stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity); @@ -80,20 +80,23 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty // Collect load queue stats coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> { - final RowKey rowKey = RowKey.builder().add(Dimension.SERVER, serverName).build(); + final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName); stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad()); stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size()); stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, queuePeon.getSegmentsToDrop().size()); queuePeon.getAndResetStats().forEachStat( - (dimValues, stat, statValue) -> - stats.add(stat, createRowKeyForServer(serverName, dimValues), statValue) + (stat, key, statValue) -> + stats.add(stat, createRowKeyForServer(serverName, key.getValues()), statValue) ); }); coordinator.getDatasourceToUnavailableSegmentCount().forEach( - (dataSource, numUnavailable) -> - stats.addToDatasourceStat(Stats.Segments.UNAVAILABLE, dataSource, numUnavailable) + (dataSource, numUnavailable) -> stats.add( + Stats.Segments.UNAVAILABLE, + RowKey.of(Dimension.DATASOURCE, dataSource), + numUnavailable + ) ); coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach( @@ -108,17 +111,18 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty (dataSource, timeline) -> { long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream() .mapToLong(DataSegment::getSize).sum(); - stats.addToDatasourceStat(Stats.Segments.USED_BYTES, dataSource, totalSizeOfUsedSegments); - stats.addToDatasourceStat(Stats.Segments.USED, dataSource, timeline.getNumObjects()); + + RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource); + stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments); + stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects()); } ); } private RowKey createRowKeyForServer(String serverName, Map dimensionValues) { - final RowKey.Builder builder = RowKey.builder(); - dimensionValues.forEach(builder::add); - builder.add(Dimension.SERVER, serverName); + final RowKey.Builder builder = RowKey.with(Dimension.SERVER, serverName); + dimensionValues.forEach(builder::with); return builder.build(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index d767c51a925..0381a70ba24 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -43,6 +43,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; @@ -593,7 +594,7 @@ public class CompactSegments implements CoordinatorCustomDuty CoordinatorRunStats stats ) { - final RowKey rowKey = RowKey.forDatasource(dataSource); + final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource); stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction()); stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction()); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java index 1a63083bf77..3cb9f0064cf 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java @@ -28,6 +28,8 @@ import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -100,7 +102,8 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty final CoordinatorRunStats stats = params.getCoordinatorStats(); datasourceToUnusedSegments.forEach( (datasource, unusedSegments) -> { - stats.addToDatasourceStat(Stats.Segments.OVERSHADOWED, datasource, unusedSegments.size()); + RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource); + stats.add(Stats.Segments.OVERSHADOWED, datasourceKey, unusedSegments.size()); coordinator.markSegmentsAsUnused(datasource, unusedSegments); } ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index eb037d1212f..a5c4eb58041 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -67,7 +67,7 @@ public class RunRules implements CoordinatorDuty final Set overshadowed = params.getDataSourcesSnapshot().getOvershadowedSegments(); final Set usedSegments = params.getUsedSegments(); log.info( - "Applying retention rules on [%d] used segments, skipping [%d] overshadowed segments.", + "Applying retention rules on [%,d] used segments, skipping [%,d] overshadowed segments.", usedSegments.size(), overshadowed.size() ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 9c133fce8c6..92dd4714a45 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -40,6 +40,8 @@ import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorStat; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpHeaders; @@ -543,7 +545,9 @@ public class HttpLoadQueuePeon implements LoadQueuePeon private void incrementStat(SegmentHolder holder, RequestStatus status) { - stats.addToDatasourceStat(status.datasourceStat, holder.getSegment().getDataSource(), 1); + RowKey rowKey = RowKey.with(Dimension.DATASOURCE, holder.getSegment().getDataSource()) + .and(Dimension.DESCRIPTION, holder.getAction().name()); + stats.add(status.datasourceStat, rowKey, 1); } private void executeCallbacks(SegmentHolder holder, boolean success) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java index 559950c18e7..fbf867fd91f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java @@ -61,7 +61,7 @@ public class SegmentLoadingConfig log.info( "Smart segment loading is enabled. Recomputed replicationThrottleLimit" - + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%d].", + + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%,d].", replicationThrottleLimit, throttlePercentage, numUsedSegments, maxSegmentsToMove ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index 4fc2e176f75..6546cce69c7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -33,6 +33,7 @@ import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -296,7 +297,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler private void reportTierCapacityStats(DataSegment segment, int requiredReplicas, String tier) { - final RowKey rowKey = RowKey.forTier(tier); + final RowKey rowKey = RowKey.of(Dimension.TIER, tier); stats.updateMax(Stats.Tier.REPLICATION_FACTOR, rowKey, requiredReplicas); stats.add(Stats.Tier.REQUIRED_CAPACITY, rowKey, segment.getSize() * requiredReplicas); } @@ -342,7 +343,8 @@ public class StrategicSegmentAssigner implements SegmentActionHandler public void deleteSegment(DataSegment segment) { loadQueueManager.deleteSegment(segment); - stats.addToDatasourceStat(Stats.Segments.DELETED, segment.getDataSource(), 1); + RowKey rowKey = RowKey.of(Dimension.DATASOURCE, segment.getDataSource()); + stats.add(Stats.Segments.DELETED, rowKey, 1); } /** @@ -429,9 +431,9 @@ public class StrategicSegmentAssigner implements SegmentActionHandler if (numToDrop > numDropsQueued) { remainingNumToDrop = numToDrop - numDropsQueued; Iterator serverIterator = - (useRoundRobinAssignment || eligibleLiveServers.size() >= remainingNumToDrop) + (useRoundRobinAssignment || eligibleLiveServers.size() <= remainingNumToDrop) ? eligibleLiveServers.iterator() - : strategy.pickServersToDropSegment(segment, eligibleLiveServers); + : strategy.findServersToDropSegment(segment, new ArrayList<>(eligibleLiveServers)); numDropsQueued += dropReplicasFromServers(remainingNumToDrop, segment, serverIterator, tier); } @@ -493,7 +495,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler ? serverSelector.getServersInTierToLoadSegment(tier, segment) : strategy.findServersToLoadSegment(segment, eligibleServers); if (!serverIterator.hasNext()) { - incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No server chosen by strategy", segment, tier); + incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No strategic server", segment, tier); return 0; } @@ -586,16 +588,10 @@ public class StrategicSegmentAssigner implements SegmentActionHandler private void incrementSkipStat(CoordinatorStat stat, String reason, DataSegment segment, String tier) { - final RowKey.Builder keyBuilder - = RowKey.builder() - .add(Dimension.TIER, tier) - .add(Dimension.DATASOURCE, segment.getDataSource()); - - if (reason != null) { - keyBuilder.add(Dimension.DESCRIPTION, reason); - } - - stats.add(stat, keyBuilder.build(), 1); + final RowKey key = RowKey.with(Dimension.TIER, tier) + .with(Dimension.DATASOURCE, segment.getDataSource()) + .and(Dimension.DESCRIPTION, reason); + stats.add(stat, key, 1); } private void incrementStat(CoordinatorStat stat, DataSegment segment, String tier, long value) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java index 7d63c78c479..f8ea6cf98f8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java @@ -64,12 +64,7 @@ public class CoordinatorRunStats public long getSegmentStat(CoordinatorStat stat, String tier, String datasource) { - return get(stat, RowKey.builder().add(Dimension.DATASOURCE, datasource).add(Dimension.TIER, tier).build()); - } - - public long getDataSourceStat(CoordinatorStat stat, String dataSource) - { - return get(stat, RowKey.forDatasource(dataSource)); + return get(stat, RowKey.with(Dimension.DATASOURCE, datasource).and(Dimension.TIER, tier)); } public long get(CoordinatorStat stat) @@ -87,7 +82,7 @@ public class CoordinatorRunStats { allStats.forEach( (rowKey, stats) -> stats.object2LongEntrySet().fastForEach( - stat -> handler.handle(rowKey.getValues(), stat.getKey(), stat.getLongValue()) + stat -> handler.handle(stat.getKey(), rowKey, stat.getLongValue()) ) ); } @@ -199,16 +194,10 @@ public class CoordinatorRunStats .addTo(stat, value); } - public void addToDatasourceStat(CoordinatorStat stat, String dataSource, long value) - { - add(stat, RowKey.forDatasource(dataSource), value); - } - public void addToSegmentStat(CoordinatorStat stat, String tier, String datasource, long value) { - RowKey rowKey = RowKey.builder() - .add(Dimension.TIER, tier) - .add(Dimension.DATASOURCE, datasource).build(); + RowKey rowKey = RowKey.with(Dimension.TIER, tier) + .and(Dimension.DATASOURCE, datasource); add(stat, rowKey, value); } @@ -267,7 +256,7 @@ public class CoordinatorRunStats public interface StatHandler { - void handle(Map dimensionValues, CoordinatorStat stat, long statValue); + void handle(CoordinatorStat stat, RowKey rowKey, long statValue); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java index 1893f86a9cd..b0ee0a2d1f7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java @@ -40,19 +40,16 @@ public class RowKey this.hashCode = Objects.hash(values); } - public static Builder builder() + public static Builder with(Dimension dimension, String value) { - return new RowKey.Builder(); + Builder builder = new Builder(); + builder.with(dimension, value); + return builder; } - public static RowKey forTier(String tier) + public static RowKey of(Dimension dimension, String value) { - return RowKey.builder().add(Dimension.TIER, tier).build(); - } - - public static RowKey forDatasource(String datasource) - { - return RowKey.builder().add(Dimension.DATASOURCE, datasource).build(); + return with(dimension, value).build(); } public Map getValues() @@ -83,12 +80,18 @@ public class RowKey { private final Map values = new EnumMap<>(Dimension.class); - public Builder add(Dimension dimension, String value) + public Builder with(Dimension dimension, String value) { values.put(dimension, value); return this; } + public RowKey and(Dimension dimension, String value) + { + values.put(dimension, value); + return new RowKey(values); + } + public RowKey build() { return new RowKey(values); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 791d3963dd8..9864aa6b3ab 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -131,11 +131,11 @@ public class Stats public static class Balancer { - public static final CoordinatorStat RAW_COST - = new CoordinatorStat("initialCost", "segment/cost/raw"); - public static final CoordinatorStat NORMALIZATION_COST - = new CoordinatorStat("normaliznCost", "segment/cost/normalization"); - public static final CoordinatorStat NORMALIZED_COST_X_1000 - = new CoordinatorStat("normalizedCost", "segment/cost/normalized"); + public static final CoordinatorStat COMPUTATION_ERRORS + = new CoordinatorStat("costComputeError", "segment/balancer/compute/error"); + public static final CoordinatorStat COMPUTATION_TIME + = new CoordinatorStat("costComputeTime", "segment/balancer/compute/time"); + public static final CoordinatorStat COMPUTATION_COUNT + = new CoordinatorStat("costComputeCount", "segment/balancer/compute/count"); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java index 33540800693..c2c77f02cbf 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java @@ -161,14 +161,14 @@ public class CoordinatorRunStatsTest ); expected.forEach( (duty, count) -> - stats.add(STAT_1, RowKey.builder().add(Dimension.DUTY, duty).build(), count) + stats.add(STAT_1, RowKey.of(Dimension.DUTY, duty), count) ); final Map actual = new HashMap<>(); stats.forEachStat( - (dimensionValues, stat, value) -> { + (stat, rowKey, value) -> { if (stat.equals(STAT_1)) { - actual.put(dimensionValues.get(Dimension.DUTY), value); + actual.put(rowKey.getValues().get(Dimension.DUTY), value); } } ); @@ -192,7 +192,7 @@ public class CoordinatorRunStatsTest debugStats.add(DEBUG_STAT_1, Key.TIER_1, 1); Assert.assertTrue(debugStats.hasStat(DEBUG_STAT_1)); - debugStats.addToDatasourceStat(DEBUG_STAT_2, "wiki", 1); + debugStats.add(DEBUG_STAT_2, RowKey.of(Dimension.DATASOURCE, "wiki"), 1); Assert.assertFalse(debugStats.hasStat(DEBUG_STAT_2)); } @@ -201,11 +201,11 @@ public class CoordinatorRunStatsTest */ private static class Key { - static final RowKey TIER_1 = RowKey.forTier("tier1"); - static final RowKey TIER_2 = RowKey.forTier("tier2"); + static final RowKey TIER_1 = RowKey.of(Dimension.TIER, "tier1"); + static final RowKey TIER_2 = RowKey.of(Dimension.TIER, "tier2"); - static final RowKey DUTY_1 = RowKey.builder().add(Dimension.DUTY, "duty1").build(); - static final RowKey DUTY_2 = RowKey.builder().add(Dimension.DUTY, "duty2").build(); + static final RowKey DUTY_1 = RowKey.of(Dimension.DUTY, "duty1"); + static final RowKey DUTY_2 = RowKey.of(Dimension.DUTY, "duty2"); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java index f4cad1cb5cd..23975352b33 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java @@ -38,6 +38,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -102,7 +103,7 @@ public class CachingCostBalancerStrategyTest .findDestinationServerToMoveSegment(s, firstServer, serverHolderList); ServerHolder s2 = costBalancerStrategy .findDestinationServerToMoveSegment(s, firstServer, serverHolderList); - return (s1.getServer().getName().equals(s2.getServer().getName())) ? 0 : 1; + return Objects.equals(s1, s2) ? 0 : 1; } ) .sum(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java index 6580425764a..17d0a8716a1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java @@ -21,17 +21,19 @@ package org.apache.druid.server.coordinator.balancer; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.core.Event; -import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester; -import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import org.junit.After; import org.junit.Assert; @@ -61,7 +63,7 @@ public class CostBalancerStrategyTest @Before public void setup() { - balancerExecutor = new BlockingExecutorService("test-balance-exec-%d"); + balancerExecutor = Execs.singleThreaded("test-balance-exec-%d"); strategy = new CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor)); serviceEmitter = new StubServiceEmitter("test-service", "host"); @@ -241,7 +243,7 @@ public class CostBalancerStrategyTest } @Test - public void testComputeCost() + public void testComputePlacementCost() { // Create segments for different granularities final List daySegments = @@ -265,7 +267,7 @@ public class CostBalancerStrategyTest .withNumPartitions(30) .eachOfSizeInMb(100); - // Distribute the segments randomly amongst 2 servers + // Distribute the segments randomly amongst 3 servers final List segments = new ArrayList<>(daySegments); segments.addAll(monthSegments); segments.addAll(yearSegments); @@ -284,35 +286,66 @@ public class CostBalancerStrategyTest server -> new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester()) ).collect(Collectors.toList()); + final ServerHolder serverA = serverHolders.get(0); + final ServerHolder serverB = serverHolders.get(1); + final ServerHolder serverC = serverHolders.get(2); + // Verify costs for DAY, MONTH and YEAR segments - verifyServerCosts( - daySegments.get(0), - serverHolders, - 5191.500804, 8691.392080, 6418.467818 - ); - verifyServerCosts( - monthSegments.get(0), - serverHolders, - 301935.940609, 301935.940606, 304333.677669 - ); - verifyServerCosts( - yearSegments.get(0), - serverHolders, - 8468764.380437, 12098919.896931, 14501440.169452 - ); + final DataSegment daySegment = daySegments.get(0); + verifyPlacementCost(daySegment, serverA, 5191.500804); + verifyPlacementCost(daySegment, serverB, 8691.392080); + verifyPlacementCost(daySegment, serverC, 6418.467818); + + final DataSegment monthSegment = monthSegments.get(0); + verifyPlacementCost(monthSegment, serverA, 301935.940609); + verifyPlacementCost(monthSegment, serverB, 301935.940606); + verifyPlacementCost(monthSegment, serverC, 304333.677669); + + final DataSegment yearSegment = yearSegments.get(0); + verifyPlacementCost(yearSegment, serverA, 8468764.380437); + verifyPlacementCost(yearSegment, serverB, 12098919.896931); + verifyPlacementCost(yearSegment, serverC, 14501440.169452); // Verify costs for an ALL granularity segment - DataSegment allGranularitySegment = + final DataSegment allGranularitySegment = CreateDataSegments.ofDatasource(DS_WIKI) .forIntervals(1, Granularities.ALL) .eachOfSizeInMb(100).get(0); - verifyServerCosts( - allGranularitySegment, - serverHolders, - 1.1534173737329768e7, - 1.6340633534241956e7, - 1.9026400521582970e7 + verifyPlacementCost(allGranularitySegment, serverA, 1.1534173737329768e7); + verifyPlacementCost(allGranularitySegment, serverB, 1.6340633534241956e7); + verifyPlacementCost(allGranularitySegment, serverC, 1.9026400521582970e7); + } + + @Test + public void testGetAndResetStats() + { + final ServerHolder serverA = new ServerHolder( + createHistorical().toImmutableDruidServer(), + new LoadQueuePeonTester() ); + final ServerHolder serverB = new ServerHolder( + createHistorical().toImmutableDruidServer(), + new LoadQueuePeonTester() + ); + + final DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0); + + // Verify that computation stats have been tracked + strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, serverB)); + CoordinatorRunStats computeStats = strategy.getAndResetStats(); + + final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI) + .with(Dimension.DESCRIPTION, "LOAD") + .and(Dimension.TIER, "hot"); + Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT, rowKey)); + + long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME, rowKey); + Assert.assertTrue(computeTime >= 0 && computeTime <= 100); + Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS)); + + // Verify that stats have been reset + computeStats = strategy.getAndResetStats(); + Assert.assertEquals(0, computeStats.rowCount()); } @Test @@ -334,42 +367,24 @@ public class CostBalancerStrategyTest ); } - @Test(timeout = 90_000L) - public void testFindServerRaisesAlertOnTimeout() + /** + * Verifies that the cost of placing the segment on the server is as expected. + * Also verifies that this cost is equal to the total joint cost of this segment + * with each segment currently on the server. + */ + private void verifyPlacementCost(DataSegment segment, ServerHolder server, double expectedCost) { - DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI) - .forIntervals(1, Granularities.DAY) - .startingAt("2012-10-24") - .eachOfSizeInMb(100).get(0); + double observedCost = strategy.computePlacementCost(segment, server); + Assert.assertEquals(expectedCost, observedCost, DELTA); - final LoadQueuePeonTester peon = new LoadQueuePeonTester(); - ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), peon); - ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), peon); - - strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, serverB)); - - List events = serviceEmitter.getEvents(); - Assert.assertEquals(1, events.size()); - Assert.assertTrue(events.get(0) instanceof AlertEvent); - - AlertEvent alertEvent = (AlertEvent) events.get(0); - Assert.assertEquals( - "Cost balancer strategy timed out in action [findServersToLoadSegment]." - + " Try setting a higher value of 'balancerComputeThreads'.", - alertEvent.getDescription() - ); - } - - private void verifyServerCosts( - DataSegment segment, - List serverHolders, - double... expectedCosts - ) - { - for (int i = 0; i < serverHolders.size(); ++i) { - double observedCost = strategy.computeCost(segment, serverHolders.get(i), true); - Assert.assertEquals(expectedCosts[i], observedCost, DELTA); + double totalJointSegmentCost = 0; + for (DataSegment segmentOnServer : server.getServer().iterateAllSegments()) { + totalJointSegmentCost += CostBalancerStrategy.computeJointSegmentsCost(segment, segmentOnServer); } + if (server.isServingSegment(segment)) { + totalJointSegmentCost -= CostBalancerStrategy.computeJointSegmentsCost(segment, segment); + } + Assert.assertEquals(totalJointSegmentCost, observedCost, DELTA); } private void verifyJointSegmentsCost( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java index a506443e8ed..39dcc9ce50a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java @@ -37,7 +37,6 @@ import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -132,13 +131,8 @@ public class BalanceSegmentsTest @Test public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() { - final ServerHolder serverHolder1 = createHolder(server1, false, segment1, segment2); - final ServerHolder serverHolder2 = createHolder(server2, true, segment3, segment4); - final ServerHolder serverHolder3 = createHolder(server3, false); - - BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - expectFindDestinationAndReturn(strategy, serverHolder3); - EasyMock.replay(strategy); + final ServerHolder serverHolder1 = createHolder(server1, true, segment1, segment2, segment3, segment4); + final ServerHolder serverHolder2 = createHolder(server2, false); // ceil(3 * 0.6) = 2 segments from decommissioning servers CoordinatorDynamicConfig dynamicConfig = @@ -148,24 +142,21 @@ public class BalanceSegmentsTest .withDecommissioningMaxPercentOfMaxSegmentsToMove(60) .build(); DruidCoordinatorRuntimeParams params = - defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, serverHolder3) + defaultRuntimeParamsBuilder(serverHolder1, serverHolder2) .withDynamicConfigs(dynamicConfig) - .withBalancerStrategy(strategy) + .withBalancerStrategy(balancerStrategy) .withBroadcastDatasources(broadcastDatasources) .withSegmentAssignerUsing(loadQueueManager) .build(); CoordinatorRunStats stats = runBalancer(params); - EasyMock.verify(strategy); - - // 2 segments are moved from the decommissioning server and 1 from the active server - Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1")); - Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2")); - Set segmentsMoved = serverHolder3.getPeon().getSegmentsToLoad(); - Assert.assertTrue(segmentsMoved.contains(segment3)); - Assert.assertTrue(segmentsMoved.contains(segment4)); - Assert.assertTrue(segmentsMoved.contains(segment1) || segmentsMoved.contains(segment2)); + // 2 segments are moved from the decommissioning server + long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2"); + Assert.assertEquals(2L, totalMoved); + Set segmentsMoved = serverHolder2.getPeon().getSegmentsToLoad(); + Assert.assertEquals(2, segmentsMoved.size()); } @Test @@ -220,33 +211,27 @@ public class BalanceSegmentsTest @Test public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning() { - final ServerHolder serverHolder1 = createHolder(server1, segment1, segment2); - final ServerHolder serverHolder2 = createHolder(server2, segment3, segment4); - final ServerHolder serverHolder3 = createHolder(server3); - - BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - expectFindDestinationAndReturn(strategy, serverHolder3); - EasyMock.replay(strategy); + final ServerHolder serverHolder1 = createHolder(server1, segment1, segment2, segment3, segment4); + final ServerHolder serverHolder2 = createHolder(server2); CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder() .withSmartSegmentLoading(false) - .withMaxSegmentsToMove(3) + .withMaxSegmentsToMove(4) .withDecommissioningMaxPercentOfMaxSegmentsToMove(9) .build(); DruidCoordinatorRuntimeParams params = - defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, serverHolder3) + defaultRuntimeParamsBuilder(serverHolder1, serverHolder2) .withDynamicConfigs(dynamicConfig) - .withBalancerStrategy(strategy) + .withBalancerStrategy(balancerStrategy) .withSegmentAssignerUsing(loadQueueManager) .build(); CoordinatorRunStats stats = runBalancer(params); - EasyMock.verify(strategy); long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2"); - Assert.assertEquals(3L, totalMoved); - Assert.assertEquals(3, serverHolder3.getPeon().getSegmentsToLoad().size()); + Assert.assertEquals(2L, totalMoved); + Assert.assertEquals(2, serverHolder2.getPeon().getSegmentsToLoad().size()); } /** @@ -258,18 +243,13 @@ public class BalanceSegmentsTest final ServerHolder activeServer = createHolder(server1, false, allSegments); final ServerHolder decommissioningServer = createHolder(server2, true); - BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - expectFindDestinationAndReturn(strategy, decommissioningServer); - EasyMock.replay(strategy); - DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(activeServer, decommissioningServer) - .withBalancerStrategy(strategy) + .withBalancerStrategy(balancerStrategy) .withBroadcastDatasources(broadcastDatasources) .build(); CoordinatorRunStats stats = runBalancer(params); - EasyMock.verify(strategy); Assert.assertFalse(stats.hasStat(Stats.Segments.MOVED)); } @@ -279,22 +259,17 @@ public class BalanceSegmentsTest final ServerHolder decommissioningServer = createHolder(server1, true, allSegments); final ServerHolder activeServer = createHolder(server2); - BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - expectFindDestinationAndReturn(strategy, activeServer); - EasyMock.replay(strategy); - DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(decommissioningServer, activeServer) .withDynamicConfigs( CoordinatorDynamicConfig.builder() .withSmartSegmentLoading(false) .withMaxSegmentsToMove(3).build() ) - .withBalancerStrategy(strategy) + .withBalancerStrategy(balancerStrategy) .withBroadcastDatasources(broadcastDatasources) .build(); runBalancer(params); - EasyMock.verify(strategy); Assert.assertEquals(0, decommissioningServer.getPeon().getSegmentsToLoad().size()); Assert.assertEquals(3, activeServer.getPeon().getSegmentsToLoad().size()); } @@ -358,10 +333,6 @@ public class BalanceSegmentsTest final ServerHolder holder2 = createHolder(server2, segment3, segment4); final ServerHolder holder3 = createHolder(server3); - BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - expectFindDestinationAndReturn(strategy, holder3); - EasyMock.replay(strategy); - DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(holder1, holder2, holder3) .withDynamicConfigs( @@ -372,12 +343,11 @@ public class BalanceSegmentsTest .withPercentOfSegmentsToConsiderPerMove(40) .build() ) - .withBalancerStrategy(strategy) + .withBalancerStrategy(balancerStrategy) .withBroadcastDatasources(broadcastDatasources) .build(); CoordinatorRunStats stats = runBalancer(params); - EasyMock.verify(strategy); long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1") + stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2"); Assert.assertEquals(1L, totalMoved); @@ -463,17 +433,6 @@ public class BalanceSegmentsTest ); } - private void expectFindDestinationAndReturn(BalancerStrategy strategy, ServerHolder chosenServer) - { - EasyMock.expect( - strategy.findDestinationServerToMoveSegment( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject() - ) - ).andReturn(chosenServer).anyTimes(); - } - private DataSegment createHourlySegment(String datasource, DateTime start, String version) { return new DataSegment( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 14fcd05d005..7487d332d1f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -2037,7 +2037,7 @@ public class CompactSegmentsTest // all the datasources in the coordinator stats final AtomicInteger numDatasources = new AtomicInteger(); stats.forEachStat( - (dimensionValues, stat, value) -> { + (stat, rowKey, value) -> { if (stat.equals(Stats.Compaction.PENDING_BYTES) && (expectedRemainingSegments <= 0 || value == expectedRemainingSegments)) { numDatasources.incrementAndGet(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java index cd9c1e228da..46240a340de 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java @@ -76,6 +76,7 @@ public class RunRulesTest { private static final long SERVER_SIZE_10GB = 10L << 30; private static final String DATASOURCE = "test"; + private static final RowKey DATASOURCE_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DATASOURCE); private LoadQueuePeon mockPeon; private RunRules ruleRunner; @@ -563,7 +564,7 @@ public class RunRulesTest .build(); CoordinatorRunStats stats = runDutyAndGetStats(params); - Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE)); + Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY)); } @Test @@ -616,7 +617,7 @@ public class RunRulesTest CoordinatorRunStats stats = runDutyAndGetStats(params); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, "normal", DATASOURCE)); - Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE)); + Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY)); EasyMock.verify(mockPeon); } @@ -662,7 +663,7 @@ public class RunRulesTest CoordinatorRunStats stats = runDutyAndGetStats(params); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, "normal", DATASOURCE)); - Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE)); + Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY)); EasyMock.verify(mockPeon); } @@ -705,7 +706,7 @@ public class RunRulesTest CoordinatorRunStats stats = runDutyAndGetStats(params); Assert.assertFalse(stats.hasStat(Stats.Segments.DROPPED)); - Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE)); + Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY)); EasyMock.verify(mockPeon); } @@ -1188,7 +1189,7 @@ public class RunRulesTest .build(); CoordinatorRunStats stats = runDutyAndGetStats(params); - final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER, DruidServer.DEFAULT_TIER).build(); + final RowKey tierRowKey = RowKey.of(Dimension.TIER, DruidServer.DEFAULT_TIER); Assert.assertEquals( dataSegment.getSize() * numReplicants, stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey) @@ -1250,7 +1251,7 @@ public class RunRulesTest .build(); CoordinatorRunStats stats = runDutyAndGetStats(params); - final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER, DruidServer.DEFAULT_TIER).build(); + final RowKey tierRowKey = RowKey.of(Dimension.TIER, DruidServer.DEFAULT_TIER); Assert.assertEquals( dataSegment.getSize() * numReplicants, stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index 668013f7ff3..c7e3b5b239f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordinator.rules; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -77,11 +76,8 @@ public class LoadRuleTest private ListeningExecutorService exec; private BalancerStrategy balancerStrategy; - private CachingCostBalancerStrategy cachingCostBalancerStrategy; - private SegmentLoadQueueManager loadQueueManager; private final boolean useRoundRobinAssignment; - private BalancerStrategy mockBalancerStrategy; private final AtomicInteger serverId = new AtomicInteger(); @@ -101,9 +97,6 @@ public class LoadRuleTest { exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d")); balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); - cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec); - - mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class); loadQueueManager = new SegmentLoadQueueManager(null, null, null); } @@ -119,13 +112,7 @@ public class LoadRuleTest final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - - if (!useRoundRobinAssignment) { - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(2); - } - EasyMock.replay(mockPeon, mockBalancerStrategy); + EasyMock.replay(mockPeon); DruidCluster druidCluster = DruidCluster .builder() @@ -140,14 +127,10 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI)); - EasyMock.verify(mockPeon, mockBalancerStrategy); + EasyMock.verify(mockPeon); } - private CoordinatorRunStats runRuleAndGetStats( - LoadRule rule, - DataSegment segment, - DruidCluster cluster - ) + private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment segment, DruidCluster cluster) { return runRuleAndGetStats(rule, segment, makeCoordinatorRuntimeParams(cluster, segment)); } @@ -160,7 +143,7 @@ public class LoadRuleTest { final StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner(); rule.run(segment, segmentAssigner); - return segmentAssigner.getStats(); + return params.getCoordinatorStats(); } private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams( @@ -171,7 +154,7 @@ public class LoadRuleTest return DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc()) .withDruidCluster(druidCluster) - .withBalancerStrategy(mockBalancerStrategy) + .withBalancerStrategy(balancerStrategy) .withUsedSegmentsInTest(usedSegments) .withDynamicConfigs( CoordinatorDynamicConfig.builder() @@ -189,16 +172,7 @@ public class LoadRuleTest final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - - LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); - - final DataSegment segment = createDataSegment(DS_WIKI); - - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .anyTimes(); - - EasyMock.replay(mockPeon, mockBalancerStrategy); + EasyMock.replay(mockPeon); ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer(); ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer(); @@ -207,6 +181,8 @@ public class LoadRuleTest .addTier(Tier.T1, new ServerHolder(server1, mockPeon), new ServerHolder(server2, mockPeon)) .build(); + final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); + final DataSegment segment = createDataSegment(DS_WIKI); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource())); @@ -223,7 +199,7 @@ public class LoadRuleTest Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED)); - EasyMock.verify(mockPeon, mockBalancerStrategy); + EasyMock.verify(mockPeon); } @Test @@ -233,16 +209,7 @@ public class LoadRuleTest final LoadQueuePeon emptyPeon = createEmptyPeon(); emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - - LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); - - final DataSegment segment = createDataSegment(DS_WIKI); - - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .anyTimes(); - - EasyMock.replay(emptyPeon, mockBalancerStrategy); + EasyMock.replay(emptyPeon); ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer(); ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer(); @@ -251,6 +218,8 @@ public class LoadRuleTest .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon)) .build(); + final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); + final DataSegment segment = createDataSegment(DS_WIKI); CoordinatorRunStats stats = runRuleAndGetStats( rule, segment, @@ -277,7 +246,7 @@ public class LoadRuleTest Assert.assertEquals(1L, statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); - EasyMock.verify(emptyPeon, mockBalancerStrategy); + EasyMock.verify(emptyPeon); } @Test @@ -286,16 +255,7 @@ public class LoadRuleTest final LoadQueuePeon emptyPeon = createEmptyPeon(); emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - - LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); - - final DataSegment segment = createDataSegment(DS_WIKI); - - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .anyTimes(); - - EasyMock.replay(emptyPeon, mockBalancerStrategy); + EasyMock.replay(emptyPeon); ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer(); ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer(); @@ -304,6 +264,8 @@ public class LoadRuleTest .addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon)) .build(); + final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); + final DataSegment segment = createDataSegment(DS_WIKI); CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); // Ensure that the segment is assigned to one of the historicals @@ -323,14 +285,12 @@ public class LoadRuleTest Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED)); - EasyMock.verify(emptyPeon, mockBalancerStrategy); + EasyMock.verify(emptyPeon); } @Test public void testLoadUsedSegmentsForAllSegmentGranularityAndCachingCostBalancerStrategy() { - LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); - final List segments = CreateDataSegments.ofDatasource(DS_WIKI) .forIntervals(1, Granularities.ALL) @@ -338,21 +298,18 @@ public class LoadRuleTest .eachOfSizeInMb(100); final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true); - loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().once(); - - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(cachingCostBalancerStrategy) - .anyTimes(); - - EasyMock.replay(loadingPeon, mockBalancerStrategy); + EasyMock.replay(loadingPeon); DruidCluster druidCluster = DruidCluster .builder() .addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false)) .build(); + balancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec); + + LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1)); CoordinatorRunStats stats = runRuleAndGetStats( rule, segments.get(1), @@ -360,7 +317,7 @@ public class LoadRuleTest ); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); - EasyMock.verify(loadingPeon, mockBalancerStrategy); + EasyMock.verify(loadingPeon); } @Test @@ -369,10 +326,7 @@ public class LoadRuleTest final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(4); - EasyMock.replay(mockPeon, mockBalancerStrategy); + EasyMock.replay(mockPeon); final DataSegment segment = createDataSegment(DS_WIKI); @@ -407,14 +361,7 @@ public class LoadRuleTest final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - - if (!useRoundRobinAssignment) { - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(1); - } - - EasyMock.replay(mockPeon, mockBalancerStrategy); + EasyMock.replay(mockPeon); DruidCluster druidCluster = DruidCluster .builder() @@ -428,7 +375,7 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); - EasyMock.verify(mockPeon, mockBalancerStrategy); + EasyMock.verify(mockPeon); } @Test @@ -437,7 +384,7 @@ public class LoadRuleTest final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(mockPeon, mockBalancerStrategy); + EasyMock.replay(mockPeon); final DataSegment segment = createDataSegment(DS_WIKI); @@ -460,19 +407,12 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI)); - EasyMock.verify(mockPeon, mockBalancerStrategy); + EasyMock.verify(mockPeon); } @Test public void testMaxLoadingQueueSize() { - if (!useRoundRobinAssignment) { - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(2); - } - EasyMock.replay(mockBalancerStrategy); - final LoadQueuePeonTester peon = new LoadQueuePeonTester(); final int maxSegmentsInQueue = 2; @@ -482,10 +422,7 @@ public class LoadRuleTest Tier.T1, new ServerHolder( createServer(Tier.T1).toImmutableDruidServer(), - peon, - false, - maxSegmentsInQueue, - 10 + peon, false, maxSegmentsInQueue, 10 ) ) .build(); @@ -497,7 +434,7 @@ public class LoadRuleTest DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc()) .withDruidCluster(druidCluster) - .withBalancerStrategy(mockBalancerStrategy) + .withBalancerStrategy(balancerStrategy) .withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3) .withDynamicConfigs( CoordinatorDynamicConfig.builder() @@ -517,8 +454,6 @@ public class LoadRuleTest Assert.assertEquals(1L, stats1.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment1.getDataSource())); Assert.assertEquals(1L, stats2.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment2.getDataSource())); Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment3.getDataSource())); - - EasyMock.verify(mockBalancerStrategy); } /** @@ -530,14 +465,7 @@ public class LoadRuleTest { final LoadQueuePeon mockPeon1 = createEmptyPeon(); final LoadQueuePeon mockPeon2 = createOneCallPeonMock(); - - if (!useRoundRobinAssignment) { - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(1); - } - - EasyMock.replay(mockPeon1, mockPeon2, mockBalancerStrategy); + EasyMock.replay(mockPeon1, mockPeon2); DruidCluster druidCluster = DruidCluster .builder() @@ -550,7 +478,7 @@ public class LoadRuleTest CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster); Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI)); - EasyMock.verify(mockPeon1, mockPeon2, mockBalancerStrategy); + EasyMock.verify(mockPeon1, mockPeon2); } /** @@ -572,15 +500,6 @@ public class LoadRuleTest ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false); final DataSegment segment = createDataSegment(DS_WIKI); - if (!useRoundRobinAssignment) { - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment, ImmutableList.of(holder2))) - .andReturn(Collections.singletonList(holder2).iterator()); - EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment, ImmutableList.of(holder4, holder3))) - .andReturn(Arrays.asList(holder3, holder4).iterator()); - } - - EasyMock.replay(mockBalancerStrategy); - DruidCluster druidCluster = DruidCluster .builder() .addTier(Tier.T1, holder1, holder2) @@ -593,7 +512,7 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI)); Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI)); - EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4, mockBalancerStrategy); + EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4); } /** @@ -606,10 +525,7 @@ public class LoadRuleTest final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().times(2); - EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(4); - EasyMock.replay(mockPeon, mockBalancerStrategy); + EasyMock.replay(mockPeon); final DataSegment segment1 = createDataSegment("foo1"); final DataSegment segment2 = createDataSegment("foo2"); @@ -652,10 +568,6 @@ public class LoadRuleTest final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester(); final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester(); final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester(); - EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(4); - EasyMock.replay(mockBalancerStrategy); final DataSegment segment1 = createDataSegment(DS_WIKI); @@ -751,11 +663,11 @@ public class LoadRuleTest return mockPeon2; } - private ServerHolder createServerHolder(String tier, LoadQueuePeon mockPeon1, boolean isDecommissioning) + private ServerHolder createServerHolder(String tier, LoadQueuePeon loadQueuePeon, boolean isDecommissioning) { return new ServerHolder( createServer(tier).toImmutableDruidServer(), - mockPeon1, + loadQueuePeon, isDecommissioning ); }