Improve CostBalancerStrategy, deprecate cachingCost (#14484)

Changes to `cost` strategy:
- In every `ServerHolder`, track the number of segments per datasource per interval
- Perform cost computations for a given interval just once, and then multiply by a constant
factor to account for the total segment count in that interval
- Do not perform joint cost computations with segments that are outside the compute interval
(± 45 days) for the segment being considered for move
- Remove metrics `segment/cost/*` as they were coordinator killers! Turning on these metrics
(by setting `emitBalancingStats` to true) has often caused the coordinator to be stuck for hours.
Moreover, they are too complicated to decipher and do not provide any meaningful insight into
a Druid cluster.
- Add new simpler metrics `segment/balancer/compute/*` to track cost computation time,
count and errors.

Other changes:
- Remove flaky test from `CostBalancerStrategyTest`.
- Add tests to verify that computed cost has remained unchanged
- Remove usages of mock `BalancerStrategy` from `LoadRuleTest`, `BalanceSegmentsTest`
- Clean up `BalancerStrategy` interface
This commit is contained in:
Kashif Faraz 2023-06-27 13:23:29 +05:30 committed by GitHub
parent 2f0a43790c
commit 4bd6bd0d4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 513 additions and 565 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
@ -98,6 +99,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -672,7 +674,7 @@ public class DruidCoordinator
{
try {
log.info("Starting coordinator run for group [%s]", dutyGroupName);
final long globalStart = System.currentTimeMillis();
final Stopwatch groupRunTime = Stopwatch.createStarted();
synchronized (lock) {
if (!coordLeaderSelector.isLeader()) {
@ -719,23 +721,25 @@ public class DruidCoordinator
log.info("Coordination has been paused. Duties will not run until coordination is resumed.");
}
final Stopwatch dutyRunTime = Stopwatch.createUnstarted();
for (CoordinatorDuty duty : duties) {
// Don't read state and run state in the same duty otherwise racy conditions may exist
if (!coordinationPaused
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
final long start = System.currentTimeMillis();
dutyRunTime.reset().start();
params = duty.run(params);
final long end = System.currentTimeMillis();
dutyRunTime.stop();
final String dutyName = duty.getClass().getName();
if (params == null) {
log.info("Stopping run for group [%s] on request of duty [%s].", dutyGroupName, dutyName);
return;
} else {
final RowKey rowKey = RowKey.builder().add(Dimension.DUTY, dutyName).build();
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, end - start);
final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
final long dutyRunMillis = dutyRunTime.elapsed(TimeUnit.MILLISECONDS);
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis);
}
}
}
@ -745,9 +749,9 @@ public class DruidCoordinator
if (allStats.rowCount() > 0) {
final AtomicInteger emittedCount = new AtomicInteger();
allStats.forEachStat(
(dimensionValues, stat, value) -> {
(stat, dimensions, value) -> {
if (stat.shouldEmit()) {
emitStat(stat, dimensionValues, value);
emitStat(stat, dimensions.getValues(), value);
emittedCount.incrementAndGet();
}
}
@ -760,7 +764,7 @@ public class DruidCoordinator
}
// Emit the runtime of the full DutiesRunnable
final long runMillis = System.currentTimeMillis() - globalStart;
final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms", dutyGroupName, runMillis);
}
@ -771,10 +775,6 @@ public class DruidCoordinator
private void emitStat(CoordinatorStat stat, Map<Dimension, String> dimensionValues, long value)
{
if (stat.equals(Stats.Balancer.NORMALIZED_COST_X_1000)) {
value = value / 1000;
}
ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder()
.setDimension(Dimension.DUTY_GROUP.reportedName(), dutyGroupName);
dimensionValues.forEach(

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.HashMap;
import java.util.Map;
/**
* Maintains a count of segments for each datasource and interval.
*/
public class SegmentCountsPerInterval
{
private final Map<String, Object2IntMap<Interval>>
datasourceIntervalToSegmentCount = new HashMap<>();
private final Object2IntMap<Interval> intervalToTotalSegmentCount = new Object2IntOpenHashMap<>();
public void addSegment(DataSegment segment)
{
updateCountInInterval(segment, 1);
}
public void removeSegment(DataSegment segment)
{
updateCountInInterval(segment, -1);
}
public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
{
return datasourceIntervalToSegmentCount.getOrDefault(datasource, Object2IntMaps.emptyMap());
}
public Object2IntMap<Interval> getIntervalToTotalSegmentCount()
{
return intervalToTotalSegmentCount;
}
private void updateCountInInterval(DataSegment segment, int delta)
{
intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, Integer::sum);
datasourceIntervalToSegmentCount
.computeIfAbsent(segment.getDataSource(), ds -> new Object2IntOpenHashMap<>())
.mergeInt(segment.getInterval(), delta, Integer::sum);
}
}

View File

@ -32,11 +32,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -75,11 +73,7 @@ public class ServerHolder implements Comparable<ServerHolder>
*/
private final Map<DataSegment, SegmentAction> queuedSegments = new HashMap<>();
/**
* Segments that are expected to be loaded on this server once all the
* operations in progress have completed.
*/
private final Set<DataSegment> projectedSegments = new HashSet<>();
private final SegmentCountsPerInterval projectedSegments = new SegmentCountsPerInterval();
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
{
@ -133,31 +127,31 @@ public class ServerHolder implements Comparable<ServerHolder>
AtomicInteger loadingReplicaCount
)
{
projectedSegments.addAll(server.iterateAllSegments());
for (DataSegment segment : server.iterateAllSegments()) {
projectedSegments.addSegment(segment);
}
final List<SegmentHolder> expiredSegments = new ArrayList<>();
peon.getSegmentsInQueue().forEach(
(holder) -> {
int runsInQueue = holder.incrementAndGetRunsInQueue();
if (runsInQueue > maxLifetimeInQueue) {
expiredSegments.add(holder);
}
for (SegmentHolder holder : peon.getSegmentsInQueue()) {
int runsInQueue = holder.incrementAndGetRunsInQueue();
if (runsInQueue > maxLifetimeInQueue) {
expiredSegments.add(holder);
}
final SegmentAction action = holder.getAction();
addToQueuedSegments(holder.getSegment(), simplify(action));
final SegmentAction action = holder.getAction();
addToQueuedSegments(holder.getSegment(), simplify(action));
if (action == SegmentAction.MOVE_TO) {
movingSegmentCount.incrementAndGet();
}
if (action == SegmentAction.REPLICATE) {
loadingReplicaCount.incrementAndGet();
}
}
);
if (action == SegmentAction.MOVE_TO) {
movingSegmentCount.incrementAndGet();
}
if (action == SegmentAction.REPLICATE) {
loadingReplicaCount.incrementAndGet();
}
}
peon.getSegmentsMarkedToDrop().forEach(
segment -> addToQueuedSegments(segment, SegmentAction.MOVE_FROM)
);
for (DataSegment segment : peon.getSegmentsMarkedToDrop()) {
addToQueuedSegments(segment, SegmentAction.MOVE_FROM);
}
if (!expiredSegments.isEmpty()) {
List<SegmentHolder> expiredSegmentsSubList =
@ -251,11 +245,21 @@ public class ServerHolder implements Comparable<ServerHolder>
* Segments that are expected to be loaded on this server once all the
* operations in progress have completed.
*/
public Set<DataSegment> getProjectedSegments()
public SegmentCountsPerInterval getProjectedSegments()
{
return projectedSegments;
}
public boolean isProjectedSegment(DataSegment segment)
{
SegmentAction action = getActionOnSegment(segment);
if (action == null) {
return hasSegmentLoaded(segment.getId());
} else {
return action.isLoad();
}
}
/**
* Segments that are currently in the queue for being loaded on this server.
* This does not include segments that are being moved to this server.
@ -362,10 +366,10 @@ public class ServerHolder implements Comparable<ServerHolder>
// Add to projected if load is started, remove from projected if drop has started
if (action.isLoad()) {
projectedSegments.add(segment);
projectedSegments.addSegment(segment);
sizeOfLoadingSegments += segment.getSize();
} else {
projectedSegments.remove(segment);
projectedSegments.removeSegment(segment);
if (action == SegmentAction.DROP) {
sizeOfDroppingSegments += segment.getSize();
}
@ -379,10 +383,10 @@ public class ServerHolder implements Comparable<ServerHolder>
queuedSegments.remove(segment);
if (action.isLoad()) {
projectedSegments.remove(segment);
projectedSegments.removeSegment(segment);
sizeOfLoadingSegments -= segment.getSize();
} else {
projectedSegments.add(segment);
projectedSegments.addSegment(segment);
if (action == SegmentAction.DROP) {
sizeOfDroppingSegments -= segment.getSize();
}

View File

@ -20,70 +20,60 @@
package org.apache.druid.server.coordinator.balancer;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
/**
* This interface describes the coordinator balancing strategy, which is responsible for making decisions on where
* to place {@link DataSegment}s on historical servers (described by {@link ServerHolder}). The balancing strategy
* is used by {@link org.apache.druid.server.coordinator.rules.LoadRule} to assign and drop segments, and by
* {@link BalanceSegments} to migrate segments between historicals.
* Segment balancing strategy, used in every coordinator run by
* {@link org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner}
* to choose optimal servers to load, move or drop a segment.
*/
public interface BalancerStrategy
{
/**
* Finds the best server to move a segment to according to the balancing strategy.
* Finds the best server from the list of {@code destinationServers} to load
* the {@code segmentToMove}, if it is moved from the {@code sourceServer}.
* <p>
* In order to avoid unnecessary moves when the segment is already optimally placed,
* include the {@code sourceServer} in the list of {@code destinationServers}.
*
* @param proposalSegment segment to move
* @param sourceServer Server the segment is currently placed on.
* @param destinationServers servers to consider as move destinations
* @return The server to move to, or null if no move should be made or no server is suitable
* @return The server to move to, or null if the segment is already optimally placed.
*/
@Nullable
ServerHolder findDestinationServerToMoveSegment(
DataSegment proposalSegment,
DataSegment segmentToMove,
ServerHolder sourceServer,
List<ServerHolder> destinationServers
);
/**
* Finds the best servers on which to place the {@code proposalSegment}.
* This method can be used both for placing the first copy of a segment
* in the tier or a replica of the segment.
* Finds the best servers to load the given segment. This method can be used
* both for placing the first copy of a segment in a tier or a replica of an
* already available segment.
*
* @param proposalSegment segment to place on servers
* @param serverHolders servers to consider as segment homes
* @return Iterator over the best servers (in order) on which the segment
* can be placed.
* @return Iterator over the best servers (in order of preference) to load
* the segment.
*/
Iterator<ServerHolder> findServersToLoadSegment(
DataSegment proposalSegment,
DataSegment segmentToLoad,
List<ServerHolder> serverHolders
);
/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first
* for a given drop strategy. One or more segments may be dropped, depending on how much the segment is
* over-replicated.
* @param toDropSegment segment to drop from one or more servers
* @param serverHolders set of historicals to consider dropping from
* @return Iterator for set of historicals, ordered by drop preference
* Finds the best servers to drop the given segment.
*
* @return Iterator over the servers (in order of preference) to drop the segment
*/
Iterator<ServerHolder> pickServersToDropSegment(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders);
Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop, List<ServerHolder> serverHolders);
/**
* Add balancing strategy stats during the 'balanceTier' operation of
* {@link BalanceSegments} to be included
* @param tier historical tier being balanced
* @param stats stats object to add balancing strategy stats to
* @param serverHolderList servers in tier being balanced
* Returns the stats collected by the strategy in the current run and resets
* the stats collector for the next run.
*/
void emitStats(String tier, CoordinatorRunStats stats, List<ServerHolder> serverHolderList);
CoordinatorRunStats getAndResetStats();
}

View File

@ -28,10 +28,8 @@ import org.apache.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.Set;
public class CachingCostBalancerStrategy extends CostBalancerStrategy
{
private final ClusterCostCache clusterCostCache;
public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache, ListeningExecutorService exec)
@ -41,13 +39,8 @@ public class CachingCostBalancerStrategy extends CostBalancerStrategy
}
@Override
protected double computeCost(DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer)
protected double computePlacementCost(DataSegment proposalSegment, ServerHolder server)
{
// (optional) Don't include server if it cannot load the segment
if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}
final String serverName = server.getServer().getName();
double cost = clusterCostCache.computeCost(serverName, proposalSegment);

View File

@ -125,6 +125,10 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
@Override
public BalancerStrategy createBalancerStrategy(final ListeningExecutorService exec)
{
LOG.warn(
"'cachingCost' balancer strategy has been deprecated as it can lead to"
+ " unbalanced clusters. Use 'cost' strategy instead."
);
if (!isInitialized() && config.isAwaitInitialization()) {
try {
final long startMillis = System.currentTimeMillis();

View File

@ -19,16 +19,21 @@
package org.apache.druid.server.coordinator.balancer;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.commons.math3.util.FastMath;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@ -38,11 +43,10 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class CostBalancerStrategy implements BalancerStrategy
{
@ -63,6 +67,20 @@ public class CostBalancerStrategy implements BalancerStrategy
= Comparator.<Pair<Double, ServerHolder>, Double>comparing(pair -> pair.lhs)
.thenComparing(pair -> pair.rhs);
private final CoordinatorRunStats stats = new CoordinatorRunStats();
public static double computeJointSegmentsCost(DataSegment segment, Iterable<DataSegment> segmentSet)
{
final Interval costComputeInterval = getCostComputeInterval(segment);
double totalCost = 0;
for (DataSegment s : segmentSet) {
if (costComputeInterval.overlaps(s.getInterval())) {
totalCost += computeJointSegmentsCost(segment, s);
}
}
return totalCost;
}
/**
* This defines the unnormalized cost function between two segments.
*
@ -83,15 +101,20 @@ public class CostBalancerStrategy implements BalancerStrategy
final Interval intervalA = segmentA.getInterval();
final Interval intervalB = segmentB.getInterval();
// constant cost-multiplier for segments of the same datsource
final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource())
? 2.0 : 1.0;
return intervalCost(intervalA, intervalB) * multiplier;
}
public static double intervalCost(Interval intervalA, Interval intervalB)
{
final double t0 = intervalA.getStartMillis();
final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR;
final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR;
final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR;
// constant cost-multiplier for segments of the same datsource
final double multiplier = segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;
return INV_LAMBDA_SQUARE * intervalCost(t1, start, end);
}
/**
@ -199,161 +222,119 @@ public class CostBalancerStrategy implements BalancerStrategy
@Override
public Iterator<ServerHolder> findServersToLoadSegment(
DataSegment proposalSegment,
DataSegment segmentToLoad,
List<ServerHolder> serverHolders
)
{
return getServersByPlacementCost(proposalSegment, serverHolders, false, "findServersToLoadSegment");
return orderServersByPlacementCost(segmentToLoad, serverHolders, SegmentAction.LOAD)
.stream()
.filter(server -> server.canLoadSegment(segmentToLoad))
.iterator();
}
@Override
public ServerHolder findDestinationServerToMoveSegment(
DataSegment proposalSegment,
DataSegment segmentToMove,
ServerHolder sourceServer,
List<ServerHolder> serverHolders
)
{
Iterator<ServerHolder> servers =
getServersByPlacementCost(proposalSegment, serverHolders, true, "findServerToMoveSegment");
return servers.hasNext() ? servers.next() : null;
}
public static double computeJointSegmentsCost(DataSegment segment, Iterable<DataSegment> segmentSet)
{
double totalCost = 0;
for (DataSegment s : segmentSet) {
totalCost += computeJointSegmentsCost(segment, s);
List<ServerHolder> servers =
orderServersByPlacementCost(segmentToMove, serverHolders, SegmentAction.MOVE_TO);
if (servers.isEmpty()) {
return null;
}
return totalCost;
ServerHolder candidateServer = servers.get(0);
return candidateServer.equals(sourceServer) ? null : candidateServer;
}
@Override
public Iterator<ServerHolder> pickServersToDropSegment(
public Iterator<ServerHolder> findServersToDropSegment(
DataSegment segmentToDrop,
NavigableSet<ServerHolder> serverHolders
List<ServerHolder> serverHolders
)
{
List<ServerHolder> serversByCost = Lists.newArrayList(
getServersByPlacementCost(segmentToDrop, serverHolders, true, "pickServersToDropSegment")
);
List<ServerHolder> serversByCost =
orderServersByPlacementCost(segmentToDrop, serverHolders, SegmentAction.DROP);
// Prioritize drop from highest cost servers
return Lists.reverse(serversByCost).iterator();
}
/**
* Calculates the initial cost of the Druid segment configuration.
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return The initial cost of the Druid tier.
*/
public double calculateInitialTotalCost(final List<ServerHolder> serverHolders)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
// segments are dumped into an array because it's probably better than iterating the iterateAllSegments() result
// quadratically in a loop, which can generate garbage in the form of Stream, Spliterator, Iterator, etc. objects
// whose total memory volume exceeds the size of the DataSegment array.
DataSegment[] segments = server.getServer().iterateAllSegments().toArray(new DataSegment[0]);
for (DataSegment s1 : segments) {
for (DataSegment s2 : segments) {
cost += computeJointSegmentsCost(s1, s2);
}
}
}
return cost;
}
/**
* Calculates the cost normalization. This is such that the normalized cost is lower bounded
* by 1 (e.g. when each segment gets its own historical node).
*
* @param serverHolders A list of ServerHolders for a particular tier.
*
* @return The normalization value (the sum of the diagonal entries in the
* pairwise cost matrix). This is the cost of a cluster if each
* segment were to get its own historical node.
*/
public double calculateNormalization(final List<ServerHolder> serverHolders)
{
double cost = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServedSegments()) {
cost += computeJointSegmentsCost(segment, segment);
}
}
return cost;
}
@Override
public void emitStats(String tier, CoordinatorRunStats stats, List<ServerHolder> serverHolderList)
public CoordinatorRunStats getAndResetStats()
{
final double initialTotalCost = calculateInitialTotalCost(serverHolderList);
final double normalization = calculateNormalization(serverHolderList);
final double normalizedInitialCost = initialTotalCost / normalization;
final RowKey rowKey = RowKey.forTier(tier);
stats.add(Stats.Balancer.RAW_COST, rowKey, (long) initialTotalCost);
stats.add(Stats.Balancer.NORMALIZATION_COST, rowKey, (long) normalization);
stats.add(Stats.Balancer.NORMALIZED_COST_X_1000, rowKey, (long) (normalizedInitialCost * 1000));
log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
tier, initialTotalCost, normalization, normalizedInitialCost
);
return stats.getSnapshotAndReset();
}
protected double computeCost(
final DataSegment proposalSegment,
final ServerHolder server,
final boolean includeCurrentServer
)
/**
* Computes the cost of placing a segment on this server.
*/
protected double computePlacementCost(DataSegment proposalSegment, ServerHolder server)
{
// (optional) Don't include server if it cannot load the segment
if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}
final Interval costComputeInterval = getCostComputeInterval(proposalSegment);
// The contribution to the total cost of a given server by proposing to move the segment to that server is...
double cost = 0d;
// Compute number of segments in each interval
final Object2IntOpenHashMap<Interval> intervalToSegmentCount = new Object2IntOpenHashMap<>();
// the sum of the costs of segments expected to be on the server (loaded + loading - dropping)
Set<DataSegment> projectedSegments = server.getProjectedSegments();
cost += computeJointSegmentsCost(proposalSegment, projectedSegments);
final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments();
projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry -> {
final Interval interval = entry.getKey();
if (costComputeInterval.overlaps(interval)) {
intervalToSegmentCount.addTo(interval, entry.getIntValue());
}
});
// minus the self cost of the segment
if (projectedSegments.contains(proposalSegment)) {
cost -= computeJointSegmentsCost(proposalSegment, proposalSegment);
// Count the segments for the same datasource twice as they have twice the cost
final String datasource = proposalSegment.getDataSource();
projectedSegments.getIntervalToSegmentCount(datasource).object2IntEntrySet().forEach(entry -> {
final Interval interval = entry.getKey();
if (costComputeInterval.overlaps(interval)) {
intervalToSegmentCount.addTo(interval, entry.getIntValue());
}
});
// Compute joint cost for each interval
double cost = 0;
final Interval segmentInterval = proposalSegment.getInterval();
cost += intervalToSegmentCount.object2IntEntrySet().stream().mapToDouble(
entry -> intervalCost(segmentInterval, entry.getKey())
* entry.getIntValue()
).sum();
// Minus the self cost of the segment
if (server.isProjectedSegment(proposalSegment)) {
cost -= intervalCost(segmentInterval, segmentInterval) * 2.0;
}
return cost;
}
/**
* Returns an iterator over the servers, ordered by increasing cost for
* placing the given segment on that server.
*
* @param includeCurrentServer true if the server already serving a replica
* of this segment should be included in the results
* Orders the servers by increasing cost for placing the given segment.
*/
private Iterator<ServerHolder> getServersByPlacementCost(
DataSegment proposalSegment,
Iterable<ServerHolder> serverHolders,
boolean includeCurrentServer,
String action
private List<ServerHolder> orderServersByPlacementCost(
DataSegment segment,
List<ServerHolder> serverHolders,
SegmentAction action
)
{
final Stopwatch computeTime = Stopwatch.createStarted();
final List<ListenableFuture<Pair<Double, ServerHolder>>> futures = new ArrayList<>();
for (ServerHolder server : serverHolders) {
futures.add(
exec.submit(
() -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server)
() -> Pair.of(computePlacementCost(segment, server), server)
)
);
}
String tier = serverHolders.isEmpty() ? null : serverHolders.get(0).getServer().getTier();
final RowKey metricKey = RowKey.with(Dimension.TIER, tier)
.with(Dimension.DATASOURCE, segment.getDataSource())
.and(Dimension.DESCRIPTION, action.name());
final PriorityQueue<Pair<Double, ServerHolder>> costPrioritizedServers =
new PriorityQueue<>(CHEAPEST_SERVERS_FIRST);
try {
@ -364,30 +345,57 @@ public class CostBalancerStrategy implements BalancerStrategy
);
}
catch (Exception e) {
alertOnFailure(e, action);
stats.add(Stats.Balancer.COMPUTATION_ERRORS, metricKey, 1);
handleFailure(e, segment, action);
}
// Include current server only if specified
return costPrioritizedServers.stream()
.filter(pair -> includeCurrentServer || pair.rhs.canLoadSegment(proposalSegment))
.map(pair -> pair.rhs).iterator();
// Report computation stats
computeTime.stop();
stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1);
stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey, computeTime.elapsed(TimeUnit.MILLISECONDS));
return costPrioritizedServers.stream().map(pair -> pair.rhs)
.collect(Collectors.toList());
}
private void alertOnFailure(Exception e, String action)
private void handleFailure(
Exception e,
DataSegment segment,
SegmentAction action
)
{
// Do not alert if the executor has been shutdown
final String reason;
String suggestion = "";
if (exec.isShutdown()) {
log.noStackTrace().info("Balancer executor was terminated. Failing action [%s].", action);
return;
reason = "Executor shutdown";
} else if (e instanceof TimeoutException) {
reason = "Timed out";
suggestion = " Try setting a higher value for 'balancerComputeThreads'.";
} else {
reason = e.getMessage();
}
final boolean hasTimedOut = e instanceof TimeoutException;
final String message = StringUtils.format(
"Cost balancer strategy %s in action [%s].%s",
hasTimedOut ? "timed out" : "failed", action,
hasTimedOut ? " Try setting a higher value of 'balancerComputeThreads'." : ""
);
log.makeAlert(e, message).emit();
String msgFormat = "Cost strategy computations failed for action[%s] on segment[%s] due to reason[%s].[%s]";
log.noStackTrace().warn(e, msgFormat, action, segment.getId(), reason, suggestion);
}
/**
* The cost compute interval for a segment is {@code [start-45days, end+45days)}.
* This is because the joint cost of any two segments that are 45 days apart is
* negligible.
*/
private static Interval getCostComputeInterval(DataSegment segment)
{
final Interval segmentInterval = segment.getInterval();
if (Intervals.isEternity(segmentInterval)) {
return segmentInterval;
} else {
final long maxGap = TimeUnit.DAYS.toMillis(45);
return Intervals.utc(
segmentInterval.getStartMillis() - maxGap,
segmentInterval.getEndMillis() + maxGap
);
}
}
}

View File

@ -35,13 +35,12 @@ public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
* This ensures that all the hosts will have the same % disk utilization.
*/
@Override
protected double computeCost(
protected double computePlacementCost(
final DataSegment proposalSegment,
final ServerHolder server,
final boolean includeCurrentServer
final ServerHolder server
)
{
double cost = super.computeCost(proposalSegment, server, includeCurrentServer);
double cost = super.computePlacementCost(proposalSegment, server);
if (cost == Double.POSITIVE_INFINITY) {
return cost;

View File

@ -27,7 +27,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.stream.Collectors;
/**
@ -39,16 +38,18 @@ import java.util.stream.Collectors;
*/
public class RandomBalancerStrategy implements BalancerStrategy
{
private static final CoordinatorRunStats EMPTY_STATS = new CoordinatorRunStats();
@Override
public Iterator<ServerHolder> findServersToLoadSegment(
DataSegment proposalSegment,
DataSegment segmentToLoad,
List<ServerHolder> serverHolders
)
{
// Filter out servers which cannot load this segment
final List<ServerHolder> usableServerHolders =
serverHolders.stream()
.filter(server -> server.canLoadSegment(proposalSegment))
.filter(server -> server.canLoadSegment(segmentToLoad))
.collect(Collectors.toList());
Collections.shuffle(usableServerHolders);
return usableServerHolders.iterator();
@ -56,7 +57,7 @@ public class RandomBalancerStrategy implements BalancerStrategy
@Override
public ServerHolder findDestinationServerToMoveSegment(
DataSegment proposalSegment,
DataSegment segmentToMove,
ServerHolder sourceServer,
List<ServerHolder> serverHolders
)
@ -66,7 +67,7 @@ public class RandomBalancerStrategy implements BalancerStrategy
}
@Override
public Iterator<ServerHolder> pickServersToDropSegment(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders)
public Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop, List<ServerHolder> serverHolders)
{
List<ServerHolder> serverList = new ArrayList<>(serverHolders);
Collections.shuffle(serverList);
@ -74,7 +75,8 @@ public class RandomBalancerStrategy implements BalancerStrategy
}
@Override
public void emitStats(String tier, CoordinatorRunStats stats, List<ServerHolder> serverHolderList)
public CoordinatorRunStats getAndResetStats()
{
return EMPTY_STATS;
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.balancer;
import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@ -56,11 +55,9 @@ public class TierSegmentBalancer
private final DruidCoordinatorRuntimeParams params;
private final StrategicSegmentAssigner segmentAssigner;
private final BalancerStrategy strategy;
private final SegmentLoadingConfig loadingConfig;
private final CoordinatorRunStats runStats;
private final Set<ServerHolder> allServers;
private final List<ServerHolder> activeServers;
private final List<ServerHolder> decommissioningServers;
private final int totalMaxSegmentsToMove;
@ -77,7 +74,6 @@ public class TierSegmentBalancer
this.params = params;
this.segmentAssigner = params.getSegmentAssigner();
this.strategy = params.getBalancerStrategy();
this.loadingConfig = params.getSegmentLoadingConfig();
this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
this.runStats = segmentAssigner.getStats();
@ -86,7 +82,6 @@ public class TierSegmentBalancer
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
this.decommissioningServers = partitions.get(true);
this.activeServers = partitions.get(false);
this.allServers = servers;
this.movingSegmentCount = activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
}
@ -128,10 +123,6 @@ public class TierSegmentBalancer
"Moved [%d] segments out of max [%d] between active servers in tier [%s].",
movedGeneralSegments, maxGeneralSegmentsToMove, tier
);
if (loadingConfig.isEmitBalancingStats()) {
strategy.emitStats(tier, runStats, Lists.newArrayList(allServers));
}
}
private int moveSegmentsFromTo(
@ -224,13 +215,9 @@ public class TierSegmentBalancer
private void markUnmoved(String reason, DataSegment segment)
{
final RowKey key
= RowKey.builder()
.add(Dimension.TIER, tier)
.add(Dimension.DATASOURCE, segment.getDataSource())
.add(Dimension.DESCRIPTION, reason)
.build();
RowKey key = RowKey.with(Dimension.TIER, tier)
.with(Dimension.DATASOURCE, segment.getDataSource())
.and(Dimension.DESCRIPTION, reason);
runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1);
}

View File

@ -24,6 +24,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
/**
*
@ -57,6 +58,11 @@ public class BalanceSegments implements CoordinatorDuty
(tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
);
CoordinatorRunStats runStats = params.getCoordinatorStats();
params.getBalancerStrategy()
.getAndResetStats()
.forEachStat(runStats::add);
return params;
}

View File

@ -72,7 +72,7 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
final DruidCluster cluster = params.getDruidCluster();
cluster.getHistoricals().forEach((tier, historicals) -> {
final RowKey rowKey = RowKey.forTier(tier);
final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
long totalCapacity = historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum);
stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
@ -80,20 +80,23 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
// Collect load queue stats
coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> {
final RowKey rowKey = RowKey.builder().add(Dimension.SERVER, serverName).build();
final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size());
stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, queuePeon.getSegmentsToDrop().size());
queuePeon.getAndResetStats().forEachStat(
(dimValues, stat, statValue) ->
stats.add(stat, createRowKeyForServer(serverName, dimValues), statValue)
(stat, key, statValue) ->
stats.add(stat, createRowKeyForServer(serverName, key.getValues()), statValue)
);
});
coordinator.getDatasourceToUnavailableSegmentCount().forEach(
(dataSource, numUnavailable) ->
stats.addToDatasourceStat(Stats.Segments.UNAVAILABLE, dataSource, numUnavailable)
(dataSource, numUnavailable) -> stats.add(
Stats.Segments.UNAVAILABLE,
RowKey.of(Dimension.DATASOURCE, dataSource),
numUnavailable
)
);
coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach(
@ -108,17 +111,18 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
(dataSource, timeline) -> {
long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
.mapToLong(DataSegment::getSize).sum();
stats.addToDatasourceStat(Stats.Segments.USED_BYTES, dataSource, totalSizeOfUsedSegments);
stats.addToDatasourceStat(Stats.Segments.USED, dataSource, timeline.getNumObjects());
RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments);
stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects());
}
);
}
private RowKey createRowKeyForServer(String serverName, Map<Dimension, String> dimensionValues)
{
final RowKey.Builder builder = RowKey.builder();
dimensionValues.forEach(builder::add);
builder.add(Dimension.SERVER, serverName);
final RowKey.Builder builder = RowKey.with(Dimension.SERVER, serverName);
dimensionValues.forEach(builder::with);
return builder.build();
}

View File

@ -43,6 +43,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@ -593,7 +594,7 @@ public class CompactSegments implements CoordinatorCustomDuty
CoordinatorRunStats stats
)
{
final RowKey rowKey = RowKey.forDatasource(dataSource);
final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource);
stats.add(Stats.Compaction.PENDING_BYTES, rowKey, autoCompactionSnapshot.getBytesAwaitingCompaction());
stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey, autoCompactionSnapshot.getSegmentCountAwaitingCompaction());

View File

@ -28,6 +28,8 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -100,7 +102,8 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
final CoordinatorRunStats stats = params.getCoordinatorStats();
datasourceToUnusedSegments.forEach(
(datasource, unusedSegments) -> {
stats.addToDatasourceStat(Stats.Segments.OVERSHADOWED, datasource, unusedSegments.size());
RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource);
stats.add(Stats.Segments.OVERSHADOWED, datasourceKey, unusedSegments.size());
coordinator.markSegmentsAsUnused(datasource, unusedSegments);
}
);

View File

@ -67,7 +67,7 @@ public class RunRules implements CoordinatorDuty
final Set<DataSegment> overshadowed = params.getDataSourcesSnapshot().getOvershadowedSegments();
final Set<DataSegment> usedSegments = params.getUsedSegments();
log.info(
"Applying retention rules on [%d] used segments, skipping [%d] overshadowed segments.",
"Applying retention rules on [%,d] used segments, skipping [%,d] overshadowed segments.",
usedSegments.size(), overshadowed.size()
);

View File

@ -40,6 +40,8 @@ import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpHeaders;
@ -543,7 +545,9 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
private void incrementStat(SegmentHolder holder, RequestStatus status)
{
stats.addToDatasourceStat(status.datasourceStat, holder.getSegment().getDataSource(), 1);
RowKey rowKey = RowKey.with(Dimension.DATASOURCE, holder.getSegment().getDataSource())
.and(Dimension.DESCRIPTION, holder.getAction().name());
stats.add(status.datasourceStat, rowKey, 1);
}
private void executeCallbacks(SegmentHolder holder, boolean success)

View File

@ -61,7 +61,7 @@ public class SegmentLoadingConfig
log.info(
"Smart segment loading is enabled. Recomputed replicationThrottleLimit"
+ " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%d].",
+ " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%,d].",
replicationThrottleLimit, throttlePercentage, numUsedSegments, maxSegmentsToMove
);

View File

@ -33,6 +33,7 @@ import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -296,7 +297,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
private void reportTierCapacityStats(DataSegment segment, int requiredReplicas, String tier)
{
final RowKey rowKey = RowKey.forTier(tier);
final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
stats.updateMax(Stats.Tier.REPLICATION_FACTOR, rowKey, requiredReplicas);
stats.add(Stats.Tier.REQUIRED_CAPACITY, rowKey, segment.getSize() * requiredReplicas);
}
@ -342,7 +343,8 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
public void deleteSegment(DataSegment segment)
{
loadQueueManager.deleteSegment(segment);
stats.addToDatasourceStat(Stats.Segments.DELETED, segment.getDataSource(), 1);
RowKey rowKey = RowKey.of(Dimension.DATASOURCE, segment.getDataSource());
stats.add(Stats.Segments.DELETED, rowKey, 1);
}
/**
@ -429,9 +431,9 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
if (numToDrop > numDropsQueued) {
remainingNumToDrop = numToDrop - numDropsQueued;
Iterator<ServerHolder> serverIterator =
(useRoundRobinAssignment || eligibleLiveServers.size() >= remainingNumToDrop)
(useRoundRobinAssignment || eligibleLiveServers.size() <= remainingNumToDrop)
? eligibleLiveServers.iterator()
: strategy.pickServersToDropSegment(segment, eligibleLiveServers);
: strategy.findServersToDropSegment(segment, new ArrayList<>(eligibleLiveServers));
numDropsQueued += dropReplicasFromServers(remainingNumToDrop, segment, serverIterator, tier);
}
@ -493,7 +495,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
? serverSelector.getServersInTierToLoadSegment(tier, segment)
: strategy.findServersToLoadSegment(segment, eligibleServers);
if (!serverIterator.hasNext()) {
incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No server chosen by strategy", segment, tier);
incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No strategic server", segment, tier);
return 0;
}
@ -586,16 +588,10 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
private void incrementSkipStat(CoordinatorStat stat, String reason, DataSegment segment, String tier)
{
final RowKey.Builder keyBuilder
= RowKey.builder()
.add(Dimension.TIER, tier)
.add(Dimension.DATASOURCE, segment.getDataSource());
if (reason != null) {
keyBuilder.add(Dimension.DESCRIPTION, reason);
}
stats.add(stat, keyBuilder.build(), 1);
final RowKey key = RowKey.with(Dimension.TIER, tier)
.with(Dimension.DATASOURCE, segment.getDataSource())
.and(Dimension.DESCRIPTION, reason);
stats.add(stat, key, 1);
}
private void incrementStat(CoordinatorStat stat, DataSegment segment, String tier, long value)

View File

@ -64,12 +64,7 @@ public class CoordinatorRunStats
public long getSegmentStat(CoordinatorStat stat, String tier, String datasource)
{
return get(stat, RowKey.builder().add(Dimension.DATASOURCE, datasource).add(Dimension.TIER, tier).build());
}
public long getDataSourceStat(CoordinatorStat stat, String dataSource)
{
return get(stat, RowKey.forDatasource(dataSource));
return get(stat, RowKey.with(Dimension.DATASOURCE, datasource).and(Dimension.TIER, tier));
}
public long get(CoordinatorStat stat)
@ -87,7 +82,7 @@ public class CoordinatorRunStats
{
allStats.forEach(
(rowKey, stats) -> stats.object2LongEntrySet().fastForEach(
stat -> handler.handle(rowKey.getValues(), stat.getKey(), stat.getLongValue())
stat -> handler.handle(stat.getKey(), rowKey, stat.getLongValue())
)
);
}
@ -199,16 +194,10 @@ public class CoordinatorRunStats
.addTo(stat, value);
}
public void addToDatasourceStat(CoordinatorStat stat, String dataSource, long value)
{
add(stat, RowKey.forDatasource(dataSource), value);
}
public void addToSegmentStat(CoordinatorStat stat, String tier, String datasource, long value)
{
RowKey rowKey = RowKey.builder()
.add(Dimension.TIER, tier)
.add(Dimension.DATASOURCE, datasource).build();
RowKey rowKey = RowKey.with(Dimension.TIER, tier)
.and(Dimension.DATASOURCE, datasource);
add(stat, rowKey, value);
}
@ -267,7 +256,7 @@ public class CoordinatorRunStats
public interface StatHandler
{
void handle(Map<Dimension, String> dimensionValues, CoordinatorStat stat, long statValue);
void handle(CoordinatorStat stat, RowKey rowKey, long statValue);
}
}

View File

@ -40,19 +40,16 @@ public class RowKey
this.hashCode = Objects.hash(values);
}
public static Builder builder()
public static Builder with(Dimension dimension, String value)
{
return new RowKey.Builder();
Builder builder = new Builder();
builder.with(dimension, value);
return builder;
}
public static RowKey forTier(String tier)
public static RowKey of(Dimension dimension, String value)
{
return RowKey.builder().add(Dimension.TIER, tier).build();
}
public static RowKey forDatasource(String datasource)
{
return RowKey.builder().add(Dimension.DATASOURCE, datasource).build();
return with(dimension, value).build();
}
public Map<Dimension, String> getValues()
@ -83,12 +80,18 @@ public class RowKey
{
private final Map<Dimension, String> values = new EnumMap<>(Dimension.class);
public Builder add(Dimension dimension, String value)
public Builder with(Dimension dimension, String value)
{
values.put(dimension, value);
return this;
}
public RowKey and(Dimension dimension, String value)
{
values.put(dimension, value);
return new RowKey(values);
}
public RowKey build()
{
return new RowKey(values);

View File

@ -131,11 +131,11 @@ public class Stats
public static class Balancer
{
public static final CoordinatorStat RAW_COST
= new CoordinatorStat("initialCost", "segment/cost/raw");
public static final CoordinatorStat NORMALIZATION_COST
= new CoordinatorStat("normaliznCost", "segment/cost/normalization");
public static final CoordinatorStat NORMALIZED_COST_X_1000
= new CoordinatorStat("normalizedCost", "segment/cost/normalized");
public static final CoordinatorStat COMPUTATION_ERRORS
= new CoordinatorStat("costComputeError", "segment/balancer/compute/error");
public static final CoordinatorStat COMPUTATION_TIME
= new CoordinatorStat("costComputeTime", "segment/balancer/compute/time");
public static final CoordinatorStat COMPUTATION_COUNT
= new CoordinatorStat("costComputeCount", "segment/balancer/compute/count");
}
}

View File

@ -161,14 +161,14 @@ public class CoordinatorRunStatsTest
);
expected.forEach(
(duty, count) ->
stats.add(STAT_1, RowKey.builder().add(Dimension.DUTY, duty).build(), count)
stats.add(STAT_1, RowKey.of(Dimension.DUTY, duty), count)
);
final Map<String, Long> actual = new HashMap<>();
stats.forEachStat(
(dimensionValues, stat, value) -> {
(stat, rowKey, value) -> {
if (stat.equals(STAT_1)) {
actual.put(dimensionValues.get(Dimension.DUTY), value);
actual.put(rowKey.getValues().get(Dimension.DUTY), value);
}
}
);
@ -192,7 +192,7 @@ public class CoordinatorRunStatsTest
debugStats.add(DEBUG_STAT_1, Key.TIER_1, 1);
Assert.assertTrue(debugStats.hasStat(DEBUG_STAT_1));
debugStats.addToDatasourceStat(DEBUG_STAT_2, "wiki", 1);
debugStats.add(DEBUG_STAT_2, RowKey.of(Dimension.DATASOURCE, "wiki"), 1);
Assert.assertFalse(debugStats.hasStat(DEBUG_STAT_2));
}
@ -201,11 +201,11 @@ public class CoordinatorRunStatsTest
*/
private static class Key
{
static final RowKey TIER_1 = RowKey.forTier("tier1");
static final RowKey TIER_2 = RowKey.forTier("tier2");
static final RowKey TIER_1 = RowKey.of(Dimension.TIER, "tier1");
static final RowKey TIER_2 = RowKey.of(Dimension.TIER, "tier2");
static final RowKey DUTY_1 = RowKey.builder().add(Dimension.DUTY, "duty1").build();
static final RowKey DUTY_2 = RowKey.builder().add(Dimension.DUTY, "duty2").build();
static final RowKey DUTY_1 = RowKey.of(Dimension.DUTY, "duty1");
static final RowKey DUTY_2 = RowKey.of(Dimension.DUTY, "duty2");
}
}

View File

@ -38,6 +38,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -102,7 +103,7 @@ public class CachingCostBalancerStrategyTest
.findDestinationServerToMoveSegment(s, firstServer, serverHolderList);
ServerHolder s2 = costBalancerStrategy
.findDestinationServerToMoveSegment(s, firstServer, serverHolderList);
return (s1.getServer().getName().equals(s2.getServer().getName())) ? 0 : 1;
return Objects.equals(s1, s2) ? 0 : 1;
}
)
.sum();

View File

@ -21,17 +21,19 @@ package org.apache.druid.server.coordinator.balancer;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.junit.After;
import org.junit.Assert;
@ -61,7 +63,7 @@ public class CostBalancerStrategyTest
@Before
public void setup()
{
balancerExecutor = new BlockingExecutorService("test-balance-exec-%d");
balancerExecutor = Execs.singleThreaded("test-balance-exec-%d");
strategy = new CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor));
serviceEmitter = new StubServiceEmitter("test-service", "host");
@ -241,7 +243,7 @@ public class CostBalancerStrategyTest
}
@Test
public void testComputeCost()
public void testComputePlacementCost()
{
// Create segments for different granularities
final List<DataSegment> daySegments =
@ -265,7 +267,7 @@ public class CostBalancerStrategyTest
.withNumPartitions(30)
.eachOfSizeInMb(100);
// Distribute the segments randomly amongst 2 servers
// Distribute the segments randomly amongst 3 servers
final List<DataSegment> segments = new ArrayList<>(daySegments);
segments.addAll(monthSegments);
segments.addAll(yearSegments);
@ -284,35 +286,66 @@ public class CostBalancerStrategyTest
server -> new ServerHolder(server.toImmutableDruidServer(), new LoadQueuePeonTester())
).collect(Collectors.toList());
final ServerHolder serverA = serverHolders.get(0);
final ServerHolder serverB = serverHolders.get(1);
final ServerHolder serverC = serverHolders.get(2);
// Verify costs for DAY, MONTH and YEAR segments
verifyServerCosts(
daySegments.get(0),
serverHolders,
5191.500804, 8691.392080, 6418.467818
);
verifyServerCosts(
monthSegments.get(0),
serverHolders,
301935.940609, 301935.940606, 304333.677669
);
verifyServerCosts(
yearSegments.get(0),
serverHolders,
8468764.380437, 12098919.896931, 14501440.169452
);
final DataSegment daySegment = daySegments.get(0);
verifyPlacementCost(daySegment, serverA, 5191.500804);
verifyPlacementCost(daySegment, serverB, 8691.392080);
verifyPlacementCost(daySegment, serverC, 6418.467818);
final DataSegment monthSegment = monthSegments.get(0);
verifyPlacementCost(monthSegment, serverA, 301935.940609);
verifyPlacementCost(monthSegment, serverB, 301935.940606);
verifyPlacementCost(monthSegment, serverC, 304333.677669);
final DataSegment yearSegment = yearSegments.get(0);
verifyPlacementCost(yearSegment, serverA, 8468764.380437);
verifyPlacementCost(yearSegment, serverB, 12098919.896931);
verifyPlacementCost(yearSegment, serverC, 14501440.169452);
// Verify costs for an ALL granularity segment
DataSegment allGranularitySegment =
final DataSegment allGranularitySegment =
CreateDataSegments.ofDatasource(DS_WIKI)
.forIntervals(1, Granularities.ALL)
.eachOfSizeInMb(100).get(0);
verifyServerCosts(
allGranularitySegment,
serverHolders,
1.1534173737329768e7,
1.6340633534241956e7,
1.9026400521582970e7
verifyPlacementCost(allGranularitySegment, serverA, 1.1534173737329768e7);
verifyPlacementCost(allGranularitySegment, serverB, 1.6340633534241956e7);
verifyPlacementCost(allGranularitySegment, serverC, 1.9026400521582970e7);
}
@Test
public void testGetAndResetStats()
{
final ServerHolder serverA = new ServerHolder(
createHistorical().toImmutableDruidServer(),
new LoadQueuePeonTester()
);
final ServerHolder serverB = new ServerHolder(
createHistorical().toImmutableDruidServer(),
new LoadQueuePeonTester()
);
final DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
// Verify that computation stats have been tracked
strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, serverB));
CoordinatorRunStats computeStats = strategy.getAndResetStats();
final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
.with(Dimension.DESCRIPTION, "LOAD")
.and(Dimension.TIER, "hot");
Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT, rowKey));
long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME, rowKey);
Assert.assertTrue(computeTime >= 0 && computeTime <= 100);
Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS));
// Verify that stats have been reset
computeStats = strategy.getAndResetStats();
Assert.assertEquals(0, computeStats.rowCount());
}
@Test
@ -334,42 +367,24 @@ public class CostBalancerStrategyTest
);
}
@Test(timeout = 90_000L)
public void testFindServerRaisesAlertOnTimeout()
/**
* Verifies that the cost of placing the segment on the server is as expected.
* Also verifies that this cost is equal to the total joint cost of this segment
* with each segment currently on the server.
*/
private void verifyPlacementCost(DataSegment segment, ServerHolder server, double expectedCost)
{
DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
.forIntervals(1, Granularities.DAY)
.startingAt("2012-10-24")
.eachOfSizeInMb(100).get(0);
double observedCost = strategy.computePlacementCost(segment, server);
Assert.assertEquals(expectedCost, observedCost, DELTA);
final LoadQueuePeonTester peon = new LoadQueuePeonTester();
ServerHolder serverA = new ServerHolder(createHistorical().toImmutableDruidServer(), peon);
ServerHolder serverB = new ServerHolder(createHistorical().toImmutableDruidServer(), peon);
strategy.findServersToLoadSegment(segment, Arrays.asList(serverA, serverB));
List<Event> events = serviceEmitter.getEvents();
Assert.assertEquals(1, events.size());
Assert.assertTrue(events.get(0) instanceof AlertEvent);
AlertEvent alertEvent = (AlertEvent) events.get(0);
Assert.assertEquals(
"Cost balancer strategy timed out in action [findServersToLoadSegment]."
+ " Try setting a higher value of 'balancerComputeThreads'.",
alertEvent.getDescription()
);
}
private void verifyServerCosts(
DataSegment segment,
List<ServerHolder> serverHolders,
double... expectedCosts
)
{
for (int i = 0; i < serverHolders.size(); ++i) {
double observedCost = strategy.computeCost(segment, serverHolders.get(i), true);
Assert.assertEquals(expectedCosts[i], observedCost, DELTA);
double totalJointSegmentCost = 0;
for (DataSegment segmentOnServer : server.getServer().iterateAllSegments()) {
totalJointSegmentCost += CostBalancerStrategy.computeJointSegmentsCost(segment, segmentOnServer);
}
if (server.isServingSegment(segment)) {
totalJointSegmentCost -= CostBalancerStrategy.computeJointSegmentsCost(segment, segment);
}
Assert.assertEquals(totalJointSegmentCost, observedCost, DELTA);
}
private void verifyJointSegmentsCost(

View File

@ -37,7 +37,6 @@ import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
@ -132,13 +131,8 @@ public class BalanceSegmentsTest
@Test
public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
{
final ServerHolder serverHolder1 = createHolder(server1, false, segment1, segment2);
final ServerHolder serverHolder2 = createHolder(server2, true, segment3, segment4);
final ServerHolder serverHolder3 = createHolder(server3, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
expectFindDestinationAndReturn(strategy, serverHolder3);
EasyMock.replay(strategy);
final ServerHolder serverHolder1 = createHolder(server1, true, segment1, segment2, segment3, segment4);
final ServerHolder serverHolder2 = createHolder(server2, false);
// ceil(3 * 0.6) = 2 segments from decommissioning servers
CoordinatorDynamicConfig dynamicConfig =
@ -148,24 +142,21 @@ public class BalanceSegmentsTest
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
.build();
DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, serverHolder3)
defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
.withDynamicConfigs(dynamicConfig)
.withBalancerStrategy(strategy)
.withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.withSegmentAssignerUsing(loadQueueManager)
.build();
CoordinatorRunStats stats = runBalancer(params);
EasyMock.verify(strategy);
// 2 segments are moved from the decommissioning server and 1 from the active server
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1"));
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2"));
Set<DataSegment> segmentsMoved = serverHolder3.getPeon().getSegmentsToLoad();
Assert.assertTrue(segmentsMoved.contains(segment3));
Assert.assertTrue(segmentsMoved.contains(segment4));
Assert.assertTrue(segmentsMoved.contains(segment1) || segmentsMoved.contains(segment2));
// 2 segments are moved from the decommissioning server
long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1")
+ stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2");
Assert.assertEquals(2L, totalMoved);
Set<DataSegment> segmentsMoved = serverHolder2.getPeon().getSegmentsToLoad();
Assert.assertEquals(2, segmentsMoved.size());
}
@Test
@ -220,33 +211,27 @@ public class BalanceSegmentsTest
@Test
public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
{
final ServerHolder serverHolder1 = createHolder(server1, segment1, segment2);
final ServerHolder serverHolder2 = createHolder(server2, segment3, segment4);
final ServerHolder serverHolder3 = createHolder(server3);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
expectFindDestinationAndReturn(strategy, serverHolder3);
EasyMock.replay(strategy);
final ServerHolder serverHolder1 = createHolder(server1, segment1, segment2, segment3, segment4);
final ServerHolder serverHolder2 = createHolder(server2);
CoordinatorDynamicConfig dynamicConfig =
CoordinatorDynamicConfig.builder()
.withSmartSegmentLoading(false)
.withMaxSegmentsToMove(3)
.withMaxSegmentsToMove(4)
.withDecommissioningMaxPercentOfMaxSegmentsToMove(9)
.build();
DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, serverHolder3)
defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
.withDynamicConfigs(dynamicConfig)
.withBalancerStrategy(strategy)
.withBalancerStrategy(balancerStrategy)
.withSegmentAssignerUsing(loadQueueManager)
.build();
CoordinatorRunStats stats = runBalancer(params);
EasyMock.verify(strategy);
long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1")
+ stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2");
Assert.assertEquals(3L, totalMoved);
Assert.assertEquals(3, serverHolder3.getPeon().getSegmentsToLoad().size());
Assert.assertEquals(2L, totalMoved);
Assert.assertEquals(2, serverHolder2.getPeon().getSegmentsToLoad().size());
}
/**
@ -258,18 +243,13 @@ public class BalanceSegmentsTest
final ServerHolder activeServer = createHolder(server1, false, allSegments);
final ServerHolder decommissioningServer = createHolder(server2, true);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
expectFindDestinationAndReturn(strategy, decommissioningServer);
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(activeServer, decommissioningServer)
.withBalancerStrategy(strategy)
.withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
CoordinatorRunStats stats = runBalancer(params);
EasyMock.verify(strategy);
Assert.assertFalse(stats.hasStat(Stats.Segments.MOVED));
}
@ -279,22 +259,17 @@ public class BalanceSegmentsTest
final ServerHolder decommissioningServer = createHolder(server1, true, allSegments);
final ServerHolder activeServer = createHolder(server2);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
expectFindDestinationAndReturn(strategy, activeServer);
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(decommissioningServer, activeServer)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withSmartSegmentLoading(false)
.withMaxSegmentsToMove(3).build()
)
.withBalancerStrategy(strategy)
.withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
runBalancer(params);
EasyMock.verify(strategy);
Assert.assertEquals(0, decommissioningServer.getPeon().getSegmentsToLoad().size());
Assert.assertEquals(3, activeServer.getPeon().getSegmentsToLoad().size());
}
@ -358,10 +333,6 @@ public class BalanceSegmentsTest
final ServerHolder holder2 = createHolder(server2, segment3, segment4);
final ServerHolder holder3 = createHolder(server3);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
expectFindDestinationAndReturn(strategy, holder3);
EasyMock.replay(strategy);
DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(holder1, holder2, holder3)
.withDynamicConfigs(
@ -372,12 +343,11 @@ public class BalanceSegmentsTest
.withPercentOfSegmentsToConsiderPerMove(40)
.build()
)
.withBalancerStrategy(strategy)
.withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
CoordinatorRunStats stats = runBalancer(params);
EasyMock.verify(strategy);
long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1")
+ stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource2");
Assert.assertEquals(1L, totalMoved);
@ -463,17 +433,6 @@ public class BalanceSegmentsTest
);
}
private void expectFindDestinationAndReturn(BalancerStrategy strategy, ServerHolder chosenServer)
{
EasyMock.expect(
strategy.findDestinationServerToMoveSegment(
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject()
)
).andReturn(chosenServer).anyTimes();
}
private DataSegment createHourlySegment(String datasource, DateTime start, String version)
{
return new DataSegment(

View File

@ -2037,7 +2037,7 @@ public class CompactSegmentsTest
// all the datasources in the coordinator stats
final AtomicInteger numDatasources = new AtomicInteger();
stats.forEachStat(
(dimensionValues, stat, value) -> {
(stat, rowKey, value) -> {
if (stat.equals(Stats.Compaction.PENDING_BYTES)
&& (expectedRemainingSegments <= 0 || value == expectedRemainingSegments)) {
numDatasources.incrementAndGet();

View File

@ -76,6 +76,7 @@ public class RunRulesTest
{
private static final long SERVER_SIZE_10GB = 10L << 30;
private static final String DATASOURCE = "test";
private static final RowKey DATASOURCE_STAT_KEY = RowKey.of(Dimension.DATASOURCE, DATASOURCE);
private LoadQueuePeon mockPeon;
private RunRules ruleRunner;
@ -563,7 +564,7 @@ public class RunRulesTest
.build();
CoordinatorRunStats stats = runDutyAndGetStats(params);
Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE));
Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY));
}
@Test
@ -616,7 +617,7 @@ public class RunRulesTest
CoordinatorRunStats stats = runDutyAndGetStats(params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, "normal", DATASOURCE));
Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE));
Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY));
EasyMock.verify(mockPeon);
}
@ -662,7 +663,7 @@ public class RunRulesTest
CoordinatorRunStats stats = runDutyAndGetStats(params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, "normal", DATASOURCE));
Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE));
Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY));
EasyMock.verify(mockPeon);
}
@ -705,7 +706,7 @@ public class RunRulesTest
CoordinatorRunStats stats = runDutyAndGetStats(params);
Assert.assertFalse(stats.hasStat(Stats.Segments.DROPPED));
Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED, DATASOURCE));
Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED, DATASOURCE_STAT_KEY));
EasyMock.verify(mockPeon);
}
@ -1188,7 +1189,7 @@ public class RunRulesTest
.build();
CoordinatorRunStats stats = runDutyAndGetStats(params);
final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER, DruidServer.DEFAULT_TIER).build();
final RowKey tierRowKey = RowKey.of(Dimension.TIER, DruidServer.DEFAULT_TIER);
Assert.assertEquals(
dataSegment.getSize() * numReplicants,
stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey)
@ -1250,7 +1251,7 @@ public class RunRulesTest
.build();
CoordinatorRunStats stats = runDutyAndGetStats(params);
final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER, DruidServer.DEFAULT_TIER).build();
final RowKey tierRowKey = RowKey.of(Dimension.TIER, DruidServer.DEFAULT_TIER);
Assert.assertEquals(
dataSegment.getSize() * numReplicants,
stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey)

View File

@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.rules;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -77,11 +76,8 @@ public class LoadRuleTest
private ListeningExecutorService exec;
private BalancerStrategy balancerStrategy;
private CachingCostBalancerStrategy cachingCostBalancerStrategy;
private SegmentLoadQueueManager loadQueueManager;
private final boolean useRoundRobinAssignment;
private BalancerStrategy mockBalancerStrategy;
private final AtomicInteger serverId = new AtomicInteger();
@ -101,9 +97,6 @@ public class LoadRuleTest
{
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d"));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
loadQueueManager = new SegmentLoadQueueManager(null, null, null);
}
@ -119,13 +112,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
if (!useRoundRobinAssignment) {
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
}
EasyMock.replay(mockPeon, mockBalancerStrategy);
EasyMock.replay(mockPeon);
DruidCluster druidCluster = DruidCluster
.builder()
@ -140,14 +127,10 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
EasyMock.verify(mockPeon, mockBalancerStrategy);
EasyMock.verify(mockPeon);
}
private CoordinatorRunStats runRuleAndGetStats(
LoadRule rule,
DataSegment segment,
DruidCluster cluster
)
private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment segment, DruidCluster cluster)
{
return runRuleAndGetStats(rule, segment, makeCoordinatorRuntimeParams(cluster, segment));
}
@ -160,7 +143,7 @@ public class LoadRuleTest
{
final StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
rule.run(segment, segmentAssigner);
return segmentAssigner.getStats();
return params.getCoordinatorStats();
}
private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams(
@ -171,7 +154,7 @@ public class LoadRuleTest
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerStrategy(balancerStrategy)
.withUsedSegmentsInTest(usedSegments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
@ -189,16 +172,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();
EasyMock.replay(mockPeon, mockBalancerStrategy);
EasyMock.replay(mockPeon);
ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer();
@ -207,6 +181,8 @@ public class LoadRuleTest
.addTier(Tier.T1, new ServerHolder(server1, mockPeon), new ServerHolder(server2, mockPeon))
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, segment.getDataSource()));
@ -223,7 +199,7 @@ public class LoadRuleTest
Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
EasyMock.verify(mockPeon, mockBalancerStrategy);
EasyMock.verify(mockPeon);
}
@Test
@ -233,16 +209,7 @@ public class LoadRuleTest
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();
EasyMock.replay(emptyPeon, mockBalancerStrategy);
EasyMock.replay(emptyPeon);
ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer();
@ -251,6 +218,8 @@ public class LoadRuleTest
.addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon))
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
segment,
@ -277,7 +246,7 @@ public class LoadRuleTest
Assert.assertEquals(1L, statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
EasyMock.verify(emptyPeon, mockBalancerStrategy);
EasyMock.verify(emptyPeon);
}
@Test
@ -286,16 +255,7 @@ public class LoadRuleTest
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();
EasyMock.replay(emptyPeon, mockBalancerStrategy);
EasyMock.replay(emptyPeon);
ImmutableDruidServer server1 = createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 = createServer(Tier.T1).toImmutableDruidServer();
@ -304,6 +264,8 @@ public class LoadRuleTest
.addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon))
.build();
final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
// Ensure that the segment is assigned to one of the historicals
@ -323,14 +285,12 @@ public class LoadRuleTest
Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
EasyMock.verify(emptyPeon, mockBalancerStrategy);
EasyMock.verify(emptyPeon);
}
@Test
public void testLoadUsedSegmentsForAllSegmentGranularityAndCachingCostBalancerStrategy()
{
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
final List<DataSegment> segments =
CreateDataSegments.ofDatasource(DS_WIKI)
.forIntervals(1, Granularities.ALL)
@ -338,21 +298,18 @@ public class LoadRuleTest
.eachOfSizeInMb(100);
final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true);
loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().once();
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(cachingCostBalancerStrategy)
.anyTimes();
EasyMock.replay(loadingPeon, mockBalancerStrategy);
EasyMock.replay(loadingPeon);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false))
.build();
balancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
segments.get(1),
@ -360,7 +317,7 @@ public class LoadRuleTest
);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
EasyMock.verify(loadingPeon, mockBalancerStrategy);
EasyMock.verify(loadingPeon);
}
@Test
@ -369,10 +326,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(4);
EasyMock.replay(mockPeon, mockBalancerStrategy);
EasyMock.replay(mockPeon);
final DataSegment segment = createDataSegment(DS_WIKI);
@ -407,14 +361,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
if (!useRoundRobinAssignment) {
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(1);
}
EasyMock.replay(mockPeon, mockBalancerStrategy);
EasyMock.replay(mockPeon);
DruidCluster druidCluster = DruidCluster
.builder()
@ -428,7 +375,7 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
EasyMock.verify(mockPeon, mockBalancerStrategy);
EasyMock.verify(mockPeon);
}
@Test
@ -437,7 +384,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(mockPeon, mockBalancerStrategy);
EasyMock.replay(mockPeon);
final DataSegment segment = createDataSegment(DS_WIKI);
@ -460,19 +407,12 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI));
EasyMock.verify(mockPeon, mockBalancerStrategy);
EasyMock.verify(mockPeon);
}
@Test
public void testMaxLoadingQueueSize()
{
if (!useRoundRobinAssignment) {
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
}
EasyMock.replay(mockBalancerStrategy);
final LoadQueuePeonTester peon = new LoadQueuePeonTester();
final int maxSegmentsInQueue = 2;
@ -482,10 +422,7 @@ public class LoadRuleTest
Tier.T1,
new ServerHolder(
createServer(Tier.T1).toImmutableDruidServer(),
peon,
false,
maxSegmentsInQueue,
10
peon, false, maxSegmentsInQueue, 10
)
)
.build();
@ -497,7 +434,7 @@ public class LoadRuleTest
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerStrategy(balancerStrategy)
.withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
@ -517,8 +454,6 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats1.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment1.getDataSource()));
Assert.assertEquals(1L, stats2.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment2.getDataSource()));
Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, dataSegment3.getDataSource()));
EasyMock.verify(mockBalancerStrategy);
}
/**
@ -530,14 +465,7 @@ public class LoadRuleTest
{
final LoadQueuePeon mockPeon1 = createEmptyPeon();
final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
if (!useRoundRobinAssignment) {
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(1);
}
EasyMock.replay(mockPeon1, mockPeon2, mockBalancerStrategy);
EasyMock.replay(mockPeon1, mockPeon2);
DruidCluster druidCluster = DruidCluster
.builder()
@ -550,7 +478,7 @@ public class LoadRuleTest
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
EasyMock.verify(mockPeon1, mockPeon2, mockBalancerStrategy);
EasyMock.verify(mockPeon1, mockPeon2);
}
/**
@ -572,15 +500,6 @@ public class LoadRuleTest
ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false);
final DataSegment segment = createDataSegment(DS_WIKI);
if (!useRoundRobinAssignment) {
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment, ImmutableList.of(holder2)))
.andReturn(Collections.singletonList(holder2).iterator());
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment, ImmutableList.of(holder4, holder3)))
.andReturn(Arrays.asList(holder3, holder4).iterator());
}
EasyMock.replay(mockBalancerStrategy);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, holder1, holder2)
@ -593,7 +512,7 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4, mockBalancerStrategy);
EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
}
/**
@ -606,10 +525,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().times(2);
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(4);
EasyMock.replay(mockPeon, mockBalancerStrategy);
EasyMock.replay(mockPeon);
final DataSegment segment1 = createDataSegment("foo1");
final DataSegment segment2 = createDataSegment("foo2");
@ -652,10 +568,6 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester();
final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester();
final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester();
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(4);
EasyMock.replay(mockBalancerStrategy);
final DataSegment segment1 = createDataSegment(DS_WIKI);
@ -751,11 +663,11 @@ public class LoadRuleTest
return mockPeon2;
}
private ServerHolder createServerHolder(String tier, LoadQueuePeon mockPeon1, boolean isDecommissioning)
private ServerHolder createServerHolder(String tier, LoadQueuePeon loadQueuePeon, boolean isDecommissioning)
{
return new ServerHolder(
createServer(tier).toImmutableDruidServer(),
mockPeon1,
loadQueuePeon,
isDecommissioning
);
}