Priority on loading for primary replica (#4757)

* Priority on loading for primary replica

* Simplicity fixes

* Fix on skipping drop for quick return.

* change to debug logging for no replicants.

* Fix on filter logic

* swapping if-else

* Fix on wrong "hasTier" logic

* Refactoring of LoadRule

* Rename createPredicate to createLoadQueueSizeLimitingPredicate

* Rename getHolderList to getFilteredHolders

* remove varargs

* extract out currentReplicantsInTier

* rename holders to holdersInTier

* don't do temporary removal of tier.

* rename primaryTier to tierToSkip

* change LinkedList to ArrayList

* Change MinMaxPriorityQueue in DruidCluster to TreeSet.

* Adding some comments.

* Modify log messages in light of predicates.

* Add in-method comments

* Don't create new Object2IntOpenHashMap for each run() call.

* Cache result from strategy call in the primary assignment to be reused during the same run.

* Spelling mistake

* Cleaning up javadoc.

* refactor out loading in progress check.

* Removed redundant comment.

* Removed forbidden API

* Correct non-forbidden API.

* Precision in variable type for NavigableSet.

* Obsolete comment.

* Clarity in method call and moving retrieval of ServerHolder into method call.

* Comment on mutability of CoordinatoorStats.

* Added auxiliary fixture for dropping.
This commit is contained in:
Goh Wei Xiang 2017-09-28 13:02:05 -07:00 committed by Charles Allen
parent a19f22b5bb
commit 26fd2b3a8e
15 changed files with 1019 additions and 890 deletions

View File

@ -20,19 +20,20 @@
package io.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
/**
* Contains a representation of the current state of the cluster by tier.
@ -41,7 +42,7 @@ import java.util.Set;
public class DruidCluster
{
private final Set<ServerHolder> realtimes;
private final Map<String, MinMaxPriorityQueue<ServerHolder>> historicals;
private final Map<String, NavigableSet<ServerHolder>> historicals;
public DruidCluster()
{
@ -52,7 +53,7 @@ public class DruidCluster
@VisibleForTesting
public DruidCluster(
@Nullable Set<ServerHolder> realtimes,
Map<String, MinMaxPriorityQueue<ServerHolder>> historicals
Map<String, NavigableSet<ServerHolder>> historicals
)
{
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
@ -86,9 +87,9 @@ public class DruidCluster
private void addHistorical(ServerHolder serverHolder)
{
final ImmutableDruidServer server = serverHolder.getServer();
final MinMaxPriorityQueue<ServerHolder> tierServers = historicals.computeIfAbsent(
final NavigableSet<ServerHolder> tierServers = historicals.computeIfAbsent(
server.getTier(),
k -> MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create()
k -> new TreeSet<>(Collections.reverseOrder())
);
tierServers.add(serverHolder);
}
@ -98,7 +99,7 @@ public class DruidCluster
return realtimes;
}
public Map<String, MinMaxPriorityQueue<ServerHolder>> getHistoricals()
public Map<String, NavigableSet<ServerHolder>> getHistoricals()
{
return historicals;
}
@ -108,7 +109,7 @@ public class DruidCluster
return historicals.keySet();
}
public MinMaxPriorityQueue<ServerHolder> getHistoricalsByTier(String tier)
public NavigableSet<ServerHolder> getHistoricalsByTier(String tier)
{
return historicals.get(tier);
}
@ -124,7 +125,7 @@ public class DruidCluster
return allServers;
}
public Iterable<MinMaxPriorityQueue<ServerHolder>> getSortedHistoricalsByTier()
public Iterable<NavigableSet<ServerHolder>> getSortedHistoricalsByTier()
{
return historicals.values();
}
@ -146,7 +147,7 @@ public class DruidCluster
public boolean hasTier(String tier)
{
MinMaxPriorityQueue<ServerHolder> servers = historicals.get(tier);
return (servers == null) || servers.isEmpty();
NavigableSet<ServerHolder> servers = historicals.get(tier);
return (servers != null) && !servers.isEmpty();
}
}

View File

@ -21,12 +21,12 @@ package io.druid.server.coordinator;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Table;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;
import java.util.Map;
import java.util.SortedSet;
/**
* A lookup for the number of replicants of a given segment for a certain tier.
@ -38,7 +38,7 @@ public class SegmentReplicantLookup
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();
for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
ImmutableDruidServer server = serverHolder.getServer();

View File

@ -19,6 +19,7 @@
package io.druid.server.coordinator;
import com.google.common.primitives.Longs;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
@ -52,32 +53,32 @@ public class ServerHolder implements Comparable<ServerHolder>
return peon;
}
public Long getMaxSize()
public long getMaxSize()
{
return server.getMaxSize();
}
public Long getCurrServerSize()
public long getCurrServerSize()
{
return server.getCurrSize();
}
public Long getLoadQueueSize()
public long getLoadQueueSize()
{
return peon.getLoadQueueSize();
}
public Long getSizeUsed()
public long getSizeUsed()
{
return getCurrServerSize() + getLoadQueueSize();
}
public Double getPercentUsed()
public double getPercentUsed()
{
return (100 * getSizeUsed().doubleValue()) / getMaxSize();
return (100.0 * getSizeUsed()) / getMaxSize();
}
public Long getAvailableSize()
public long getAvailableSize()
{
long maxSize = getMaxSize();
long sizeUsed = getSizeUsed();
@ -114,7 +115,22 @@ public class ServerHolder implements Comparable<ServerHolder>
@Override
public int compareTo(ServerHolder serverHolder)
{
return getAvailableSize().compareTo(serverHolder.getAvailableSize());
int result = Longs.compare(getAvailableSize(), serverHolder.getAvailableSize());
if (result != 0) {
return result;
}
result = server.getHost().compareTo(serverHolder.server.getHost());
if (result != 0) {
return result;
}
result = server.getTier().compareTo(serverHolder.server.getTier());
if (result != 0) {
return result;
}
return server.getType().compareTo(serverHolder.server.getType());
}
@Override

View File

@ -20,7 +20,6 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.StringUtils;
@ -38,6 +37,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -78,7 +79,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final CoordinatorStats stats = new CoordinatorStats();
params.getDruidCluster().getHistoricals().forEach((String tier, MinMaxPriorityQueue<ServerHolder> servers) -> {
params.getDruidCluster().getHistoricals().forEach((String tier, NavigableSet<ServerHolder> servers) -> {
balanceTier(params, tier, servers, stats);
});
return params.buildFromExisting().withCoordinatorStats(stats).build();
@ -87,7 +88,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
private void balanceTier(
DruidCoordinatorRuntimeParams params,
String tier,
MinMaxPriorityQueue<ServerHolder> servers,
SortedSet<ServerHolder> servers,
CoordinatorStats stats
)
{

View File

@ -20,8 +20,6 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.guava.Comparators;
@ -34,6 +32,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import java.util.Map;
import java.util.SortedSet;
public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelper
{
@ -55,7 +54,7 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp
DruidCluster cluster = params.getDruidCluster();
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = Maps.newHashMap();
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

View File

@ -19,8 +19,6 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.logger.Logger;
@ -34,6 +32,7 @@ import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import java.util.Set;
import java.util.SortedSet;
/**
*/
@ -64,7 +63,7 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
// This is done to prevent a race condition in which the coordinator would drop all segments if it started running
// cleanup before it finished polling the metadata storage for available segments for the first time.
if (!availableSegments.isEmpty()) {
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

View File

@ -19,7 +19,6 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
@ -171,7 +170,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
);
log.info("Load Queues:");
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (Iterable<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();
LoadQueuePeon queuePeon = serverHolder.getPeon();

View File

@ -19,24 +19,26 @@
package io.druid.server.coordinator.rules;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.IAE;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -49,188 +51,326 @@ public abstract class LoadRule implements Rule
static final String ASSIGNED_COUNT = "assignedCount";
static final String DROPPED_COUNT = "droppedCount";
private final Object2IntMap<String> targetReplicants = new Object2IntOpenHashMap<>();
private final Object2IntMap<String> currentReplicants = new Object2IntOpenHashMap<>();
// Cache to hold unused results from strategy call in assignPrimary
private final Map<String, ServerHolder> strategyCache = new HashMap<>();
@Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
{
final CoordinatorStats stats = new CoordinatorStats();
final Set<DataSegment> availableSegments = params.getAvailableSegments();
final Map<String, Integer> loadStatus = Maps.newHashMap();
int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
final int expectedReplicantsInTier = entry.getValue();
final int totalReplicantsInTier = params.getSegmentReplicantLookup()
.getTotalReplicants(segment.getIdentifier(), tier);
final int loadedReplicantsInTier = params.getSegmentReplicantLookup()
.getLoadedReplicants(segment.getIdentifier(), tier);
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
continue;
}
final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig()
.getMaxSegmentsInNodeLoadingQueue();
Predicate<ServerHolder> serverHolderPredicate;
if (maxSegmentsInNodeLoadingQueue > 0) {
serverHolderPredicate = s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue);
} else {
serverHolderPredicate = Objects::nonNull;
}
final List<ServerHolder> serverHolderList = serverQueue.stream()
.filter(serverHolderPredicate)
.collect(Collectors.toList());
final BalancerStrategy strategy = params.getBalancerStrategy();
if (availableSegments.contains(segment)) {
int assignedCount = assign(
params.getReplicationManager(),
tier,
totalReplicantsInCluster,
expectedReplicantsInTier,
totalReplicantsInTier,
strategy,
serverHolderList,
segment
);
stats.addToTieredStat(ASSIGNED_COUNT, tier, assignedCount);
totalReplicantsInCluster += assignedCount;
}
loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier);
}
// Remove over-replication
stats.accumulate(drop(loadStatus, segment, params));
return stats;
}
private int assign(
final ReplicationThrottler replicationManager,
final String tier,
final int totalReplicantsInCluster,
final int expectedReplicantsInTier,
final int totalReplicantsInTier,
final BalancerStrategy strategy,
final List<ServerHolder> serverHolderList,
public CoordinatorStats run(
final DruidCoordinator coordinator,
final DruidCoordinatorRuntimeParams params,
final DataSegment segment
)
{
int assignedCount = 0;
int currReplicantsInTier = totalReplicantsInTier;
int currTotalReplicantsInCluster = totalReplicantsInCluster;
while (currReplicantsInTier < expectedReplicantsInTier) {
boolean replicate = currTotalReplicantsInCluster > 0;
try {
// get the "snapshots" of targetReplicants and currentReplicants for assignments.
targetReplicants.putAll(getTieredReplicants());
currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier()));
if (replicate && !replicationManager.canCreateReplicant(tier)) {
break;
final CoordinatorStats stats = new CoordinatorStats();
if (params.getAvailableSegments().contains(segment)) {
assign(params, segment, stats);
}
final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
drop(params, segment, stats);
if (holder == null) {
log.warn(
"Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
expectedReplicantsInTier
);
break;
return stats;
}
if (replicate) {
replicationManager.registerReplicantCreation(
tier, segment.getIdentifier(), holder.getServer().getHost()
);
}
holder.getPeon().loadSegment(
segment,
new LoadPeonCallback()
{
@Override
public void execute()
{
replicationManager.unregisterReplicantCreation(
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
finally {
targetReplicants.clear();
currentReplicants.clear();
strategyCache.clear();
}
}
);
++assignedCount;
++currReplicantsInTier;
++currTotalReplicantsInCluster;
}
return assignedCount;
}
private CoordinatorStats drop(
final Map<String, Integer> loadStatus,
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/
private void assign(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment,
final CoordinatorStats stats
)
{
// if primary replica already exists
if (!currentReplicants.isEmpty()) {
assignReplicas(params, segment, stats, null);
} else {
final ServerHolder primaryHolderToLoad = assignPrimary(params, segment);
if (primaryHolderToLoad == null) {
// cluster does not have any replicants and cannot identify primary holder
// this implies that no assignment could be done
return;
}
int numAssigned = 1; // 1 replica (i.e., primary replica) already assigned
final String tier = primaryHolderToLoad.getServer().getTier();
// assign replicas for the rest of the tier
numAssigned += assignReplicasForTier(
tier,
targetReplicants.getOrDefault(tier, 0),
numAssigned, // note that the currentReplicantsInTier is the just-assigned primary replica.
params,
createLoadQueueSizeLimitingPredicate(params).and(holder -> !holder.equals(primaryHolderToLoad)),
segment
);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
// do assign replicas for the other tiers.
assignReplicas(params, segment, stats, tier /* to skip */);
}
}
private static Predicate<ServerHolder> createLoadQueueSizeLimitingPredicate(
final DruidCoordinatorRuntimeParams params
)
{
CoordinatorStats stats = new CoordinatorStats();
// Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) {
return stats;
final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
if (maxSegmentsInNodeLoadingQueue <= 0) {
return Objects::nonNull;
} else {
return s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue);
}
}
// Find all instances of this segment across tiers
Map<String, Integer> replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
private static List<ServerHolder> getFilteredHolders(
final String tier,
final DruidCluster druidCluster,
final Predicate<ServerHolder> predicate
)
{
final NavigableSet<ServerHolder> queue = druidCluster.getHistoricalsByTier(tier);
if (queue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return Collections.emptyList();
}
for (Map.Entry<String, Integer> entry : replicantsByTier.entrySet()) {
final String tier = entry.getKey();
int loadedNumReplicantsForTier = entry.getValue();
int expectedNumReplicantsForTier = getNumReplicants(tier);
return queue.stream().filter(predicate).collect(Collectors.toList());
}
stats.addToTieredStat(DROPPED_COUNT, tier, 0);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier);
if (serverQueue == null) {
log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
/**
* Iterates through each tier and find the respective segment homes; with the found segment homes, selects the one
* with the highest priority to be the holder for the primary replica.
*/
@Nullable
private ServerHolder assignPrimary(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment
)
{
ServerHolder topCandidate = null;
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final int targetReplicantsInTier = entry.getIntValue();
// sanity check: target number of replicants should be more than zero.
if (targetReplicantsInTier <= 0) {
continue;
}
List<ServerHolder> droppedServers = Lists.newArrayList();
while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) {
final ServerHolder holder = serverQueue.pollLast();
final String tier = entry.getKey();
final List<ServerHolder> holders = getFilteredHolders(
tier,
params.getDruidCluster(),
createLoadQueueSizeLimitingPredicate(params)
);
// no holders satisfy the predicate
if (holders.isEmpty()) {
continue;
}
final ServerHolder candidate = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
if (candidate == null) {
log.warn(
"No available [%s] servers or node capacity to assign primary segment[%s]! " +
"Expected Replicants[%d]",
tier, segment.getIdentifier(), targetReplicantsInTier
);
} else {
// cache the result for later use.
strategyCache.put(tier, candidate);
if (topCandidate == null ||
candidate.getServer().getPriority() > topCandidate.getServer().getPriority()) {
topCandidate = candidate;
}
}
}
if (topCandidate != null) {
// remove tier for primary replica
strategyCache.remove(topCandidate.getServer().getTier());
topCandidate.getPeon().loadSegment(segment, null);
}
return topCandidate;
}
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was
* assigned.
*/
private void assignReplicas(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment,
final CoordinatorStats stats,
@Nullable final String tierToSkip
)
{
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
if (tier.equals(tierToSkip)) {
continue;
}
final int numAssigned = assignReplicasForTier(
tier,
entry.getIntValue(),
currentReplicants.getOrDefault(tier, 0),
params,
createLoadQueueSizeLimitingPredicate(params),
segment
);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
}
}
/**
* @param predicate {@link Predicate} used to pre-filter {@link ServerHolder}s retrieved from {@link DruidCluster}.
*/
private int assignReplicasForTier(
final String tier,
final int targetReplicantsInTier,
final int currentReplicantsInTier,
final DruidCoordinatorRuntimeParams params,
final Predicate<ServerHolder> predicate,
final DataSegment segment
)
{
final int numToAssign = targetReplicantsInTier - currentReplicantsInTier;
// if nothing to assign
if (numToAssign <= 0) {
return 0;
}
final List<ServerHolder> holders = getFilteredHolders(tier, params.getDruidCluster(), predicate);
// if no holders available for assignment
if (holders.isEmpty()) {
return 0;
}
final ReplicationThrottler throttler = params.getReplicationManager();
for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) {
if (!throttler.canCreateReplicant(tier)) {
return numAssigned;
}
// Retrieves from cache if available
ServerHolder holder = strategyCache.remove(tier);
// Does strategy call if not in cache
if (holder == null) {
holder = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders);
}
if (holder == null) {
log.warn(
"No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier, segment.getIdentifier(), targetReplicantsInTier
);
return numAssigned;
}
holders.remove(holder);
final String segmentId = segment.getIdentifier();
final String holderHost = holder.getServer().getHost();
throttler.registerReplicantCreation(tier, segmentId, holderHost);
holder.getPeon().loadSegment(segment, () -> throttler.unregisterReplicantCreation(tier, segmentId, holderHost));
}
return numToAssign;
}
/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/
private void drop(
final DruidCoordinatorRuntimeParams params,
final DataSegment segment,
final CoordinatorStats stats
)
{
final DruidCluster druidCluster = params.getDruidCluster();
// This enforces that loading is completed before we attempt to drop stuffs as a safety measure.
if (loadingInProgress(druidCluster)) {
return;
}
for (final Object2IntMap.Entry<String> entry : currentReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
final NavigableSet<ServerHolder> holders = druidCluster.getHistoricalsByTier(tier);
final int numDropped;
if (holders == null) {
log.makeAlert("No holders found for tier[%s]", tier).emit();
numDropped = 0;
} else {
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0);
numDropped = dropForTier(numToDrop, holders, segment);
}
stats.addToTieredStat(DROPPED_COUNT, tier, numDropped);
}
}
/**
* Returns true if at least one tier in target replica assignment exists in cluster but does not have enough replicas.
*/
private boolean loadingInProgress(final DruidCluster druidCluster)
{
for (final Object2IntMap.Entry<String> entry : targetReplicants.object2IntEntrySet()) {
final String tier = entry.getKey();
// if there are replicants loading in cluster
if (druidCluster.hasTier(tier) && entry.getIntValue() > currentReplicants.getOrDefault(tier, 0)) {
return true;
}
}
return false;
}
private static int dropForTier(
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment
)
{
int numDropped = 0;
// Use the reverse order to get the holders with least available size first.
final Iterator<ServerHolder> iterator = holdersInTier.descendingIterator();
while (numDropped < numToDrop) {
if (!iterator.hasNext()) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
break;
}
final ServerHolder holder = iterator.next();
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment(
segment,
null
);
--loadedNumReplicantsForTier;
stats.addToTieredStat(DROPPED_COUNT, tier, 1);
holder.getPeon().dropSegment(segment, null);
++numDropped;
}
droppedServers.add(holder);
}
serverQueue.addAll(droppedServers);
}
return stats;
return numDropped;
}
protected void validateTieredReplicants(Map<String, Integer> tieredReplicants)
protected static void validateTieredReplicants(final Map<String, Integer> tieredReplicants)
{
if (tieredReplicants.size() == 0) {
throw new IAE("A rule with empty tiered replicants is invalid");

View File

@ -22,8 +22,6 @@ package io.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.Intervals;
@ -36,10 +34,14 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DruidClusterTest
{
@ -142,8 +144,7 @@ public class DruidClusterTest
),
ImmutableMap.of(
"tier1",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
ImmutableList.of(
Stream.of(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0),
@ -159,8 +160,7 @@ public class DruidClusterTest
),
new LoadQueuePeonTester()
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
}
@ -186,7 +186,7 @@ public class DruidClusterTest
cluster.add(newRealtime);
cluster.add(newHistorical);
final Set<ServerHolder> expectedRealtimes = cluster.getRealtimes();
final Map<String, MinMaxPriorityQueue<ServerHolder>> expectedHistoricals = cluster.getHistoricals();
final Map<String, NavigableSet<ServerHolder>> expectedHistoricals = cluster.getHistoricals();
final Collection<ServerHolder> allServers = cluster.getAllServers();
Assert.assertEquals(4, allServers.size());

View File

@ -24,13 +24,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.DateTimes;
import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.Rule;
@ -41,10 +41,12 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DruidCoordinatorBalancerProfiler
{
@ -135,11 +137,14 @@ public class DruidCoordinatorBalancerProfiler
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
serverHolderList
serverHolderList.stream().collect(
Collectors.toCollection(
() -> new TreeSet<>(
DruidCoordinatorBalancer.percentUsedComparator
)
)
)
)
)
@ -163,11 +168,14 @@ public class DruidCoordinatorBalancerProfiler
SegmentReplicantLookup.make(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
serverHolderList
serverHolderList.stream().collect(
Collectors.toCollection(
() -> new TreeSet<>(
DruidCoordinatorBalancer.percentUsedComparator
)
)
)
)
)
@ -219,13 +227,16 @@ public class DruidCoordinatorBalancerProfiler
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
Arrays.asList(
Stream.of(
new ServerHolder(druidServer1, fromPeon),
new ServerHolder(druidServer2, toPeon)
).collect(
Collectors.toCollection(
() -> new TreeSet<>(
DruidCoordinatorBalancer.percentUsedComparator
)
)
)
)

View File

@ -23,11 +23,11 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.DateTimes;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@ -43,6 +43,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -284,12 +285,13 @@ public class DruidCoordinatorBalancerTest
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
.create(
IntStream
.range(0, druidServers.size())
.mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i)))
.collect(Collectors.toList())
.collect(
Collectors.toCollection(
() -> new TreeSet<>(DruidCoordinatorBalancer.percentUsedComparator)
)
)
)
)
@ -319,7 +321,7 @@ public class DruidCoordinatorBalancerTest
Map<String, DataSegment> segments
)
{
EasyMock.expect(druidServer.getName()).andReturn(name).atLeastOnce();
EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes();
EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();

View File

@ -22,8 +22,6 @@ package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -52,10 +50,14 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
*/
@ -143,8 +145,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -157,11 +158,9 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
@ -174,11 +173,9 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"cold",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverCold",
@ -191,8 +188,7 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -258,8 +254,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -284,11 +279,9 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"cold",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverCold",
@ -301,8 +294,7 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -379,8 +371,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -393,17 +384,14 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
normServer.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -466,8 +454,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
@ -480,8 +467,7 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -524,14 +510,15 @@ public class DruidCoordinatorRuleRunnerTest
)
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.replay(databaseRuleManager, mockPeon);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
@ -544,8 +531,7 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -560,7 +546,7 @@ public class DruidCoordinatorRuleRunnerTest
ruleRunner.run(params);
EasyMock.verify(emitter);
EasyMock.verify(emitter, mockPeon);
}
@Test
@ -605,14 +591,12 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -689,8 +673,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
@ -699,8 +682,7 @@ public class DruidCoordinatorRuleRunnerTest
server2.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -779,23 +761,19 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -870,23 +848,19 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -976,8 +950,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
@ -990,8 +963,7 @@ public class DruidCoordinatorRuleRunnerTest
server3.toImmutableDruidServer(),
anotherMockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -1050,8 +1022,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -1076,8 +1047,7 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -1178,8 +1148,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -1188,15 +1157,13 @@ public class DruidCoordinatorRuleRunnerTest
1000,
ServerType.HISTORICAL,
"hot",
0
1
).toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
@ -1209,8 +1176,7 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -1303,8 +1269,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
@ -1313,8 +1278,7 @@ public class DruidCoordinatorRuleRunnerTest
server2.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
@ -1388,8 +1352,7 @@ public class DruidCoordinatorRuleRunnerTest
null,
ImmutableMap.of(
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -1402,8 +1365,7 @@ public class DruidCoordinatorRuleRunnerTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);

View File

@ -22,11 +22,10 @@ package io.druid.server.coordinator.helper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.DateTimes;
import io.druid.server.coordination.ServerType;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
@ -41,6 +40,9 @@ import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DruidCoordinatorCleanupOvershadowedTest
{
@ -71,11 +73,28 @@ public class DruidCoordinatorCleanupOvershadowedTest
druidCoordinatorCleanupOvershadowed = new DruidCoordinatorCleanupOvershadowed(coordinator);
availableSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2);
druidCluster = new DruidCluster(
null,
ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Collections.singletonList(new ServerHolder(druidServer, mockPeon))
)));
// Dummy values for comparisons in TreeSet
EasyMock.expect(mockPeon.getLoadQueueSize())
.andReturn(0L)
.anyTimes();
EasyMock.expect(druidServer.getMaxSize())
.andReturn(0L)
.anyTimes();
EasyMock.expect(druidServer.getCurrSize())
.andReturn(0L)
.anyTimes();
EasyMock.expect(druidServer.getName())
.andReturn("")
.anyTimes();
EasyMock.expect(druidServer.getHost())
.andReturn("")
.anyTimes();
EasyMock.expect(druidServer.getTier())
.andReturn("")
.anyTimes();
EasyMock.expect(druidServer.getType())
.andReturn(ServerType.HISTORICAL)
.anyTimes();
EasyMock.expect(druidServer.getDataSources())
.andReturn(ImmutableList.of(druidDataSource))
@ -88,6 +107,16 @@ public class DruidCoordinatorCleanupOvershadowedTest
coordinator.removeSegment(segmentV0);
EasyMock.expectLastCall();
EasyMock.replay(mockPeon, coordinator, druidServer, druidDataSource);
druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(druidServer, mockPeon)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
));
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder()
.withAvailableSegments(availableSegments)
.withCoordinatorStats(new CoordinatorStats())

View File

@ -23,8 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.DruidServer;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
@ -40,7 +38,11 @@ import io.druid.timeline.partition.NoneShardSpec;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -198,21 +200,17 @@ public class BroadcastDistributionRuleTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Lists.newArrayList(
Stream.of(
holdersOfLargeSegments.get(0),
holderOfSmallSegment,
holdersOfLargeSegments2.get(0)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Lists.newArrayList(
Stream.of(
holdersOfLargeSegments.get(1),
holdersOfLargeSegments.get(2),
holdersOfLargeSegments2.get(1)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
}

View File

@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -61,8 +59,12 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
*/
@ -81,86 +83,63 @@ public class LoadRuleTest
)
);
private LoadQueuePeon mockPeon;
private ReplicationThrottler throttler;
private DataSegment segment;
private ListeningExecutorService exec;
private BalancerStrategy balancerStrategy;
private BalancerStrategy mockBalancerStrategy;
@Before
public void setUp() throws Exception
{
EmittingLogger.registerEmitter(emitter);
emitter.start();
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
throttler = new ReplicationThrottler(2, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
}
segment = createDataSegment("foo");
throttler = EasyMock.createMock(ReplicationThrottler.class);
exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(mockPeon);
exec.shutdown();
emitter.close();
}
@Test
public void testLoad() throws Exception
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 1,
DruidServer.DEFAULT_TIER, 2
);
));
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
final DataSegment segment = createDataSegment("foo");
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER, segment.getIdentifier(), "hostNorm");
EasyMock.expectLastCall().once();
@Override
public String getType()
{
return "test";
}
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(3);
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
@Override
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}
};
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -169,15 +148,13 @@ public class LoadRuleTest
1000,
ServerType.HISTORICAL,
"hot",
0
1
).toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverNorm",
@ -190,81 +167,132 @@ public class LoadRuleTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(balancerStrategy)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertEquals(2L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, DruidServer.DEFAULT_TIER));
exec.shutdown();
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, DruidServer.DEFAULT_TIER));
EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}
@Test
public void testLoadPriority() throws Exception
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(false).anyTimes();
final LoadQueuePeon mockPeon1 = createEmptyPeon();
final LoadQueuePeon mockPeon2 = createEmptyPeon();
mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.isNull());
EasyMock.expectLastCall().once();
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
EasyMock.replay(throttler, mockPeon1, mockPeon2, mockBalancerStrategy);
final LoadRule rule = createLoadRule(ImmutableMap.of(
"tier1", 10,
"tier2", 10
));
final DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"tier1",
Stream.of(
new ServerHolder(
new DruidServer(
"server1",
"host1",
null,
1000,
ServerType.HISTORICAL,
"tier1",
0
).toImmutableDruidServer(),
mockPeon1
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"tier2",
Stream.of(
new ServerHolder(
new DruidServer(
"server2",
"host2",
null,
1000,
ServerType.HISTORICAL,
"tier2",
1
).toImmutableDruidServer(),
mockPeon2
),
new ServerHolder(
new DruidServer(
"server3",
"host3",
null,
1000,
ServerType.HISTORICAL,
"tier2",
1
).toImmutableDruidServer(),
mockPeon2
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
final DataSegment segment = createDataSegment("foo");
final CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Collections.singletonList(segment)).build(),
segment
);
Assert.assertEquals(0L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "tier1"));
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "tier2"));
EasyMock.verify(throttler, mockPeon1, mockPeon2, mockBalancerStrategy);
}
@Test
public void testDrop() throws Exception
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 0,
DruidServer.DEFAULT_TIER, 0
);
));
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
@Override
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}
};
final DataSegment segment = createDataSegment("foo");
DruidServer server1 = new DruidServer(
"serverHot",
@ -286,42 +314,46 @@ public class LoadRuleTest
0
);
server2.addDataSegment(segment.getIdentifier(), segment);
DruidServer server3 = new DruidServer(
"serverNormNotServing",
"hostNorm",
null,
10,
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
)
)
),
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
server3.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(balancerStrategy)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
@ -329,64 +361,33 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot"));
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", DruidServer.DEFAULT_TIER));
exec.shutdown();
EasyMock.verify(throttler, mockPeon);
}
@Test
public void testLoadWithNonExistentTier() throws Exception
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(1);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
LoadRule rule = createLoadRule(ImmutableMap.of(
"nonExistentTier", 1,
"hot", 1
);
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
@Override
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}
};
));
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
@ -399,15 +400,11 @@ public class LoadRuleTest
).toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
final DataSegment segment = createDataSegment("foo");
CoordinatorStats stats = rule.run(
null,
@ -415,64 +412,32 @@ public class LoadRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withReplicationManager(throttler)
.withBalancerStrategy(balancerStrategy)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
exec.shutdown();
EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}
@Test
public void testDropWithNonExistentTier() throws Exception
{
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
LoadRule rule = createLoadRule(ImmutableMap.of(
"nonExistentTier", 1,
"hot", 1
);
));
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
@Override
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}
};
final DataSegment segment = createDataSegment("foo");
DruidServer server1 = new DruidServer(
"serverHot",
@ -499,8 +464,7 @@ public class LoadRuleTest
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
@ -509,54 +473,119 @@ public class LoadRuleTest
server2.toImmutableDruidServer(),
mockPeon
)
)
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(balancerStrategy)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot"));
exec.shutdown();
EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}
@Test
public void testMaxLoadingQueueSize() throws Exception
{
EasyMock.replay(mockPeon);
LoadQueuePeonTester peon = new LoadQueuePeonTester();
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
EasyMock.replay(throttler, mockBalancerStrategy);
final LoadQueuePeonTester peon = new LoadQueuePeonTester();
LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 1
));
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
peon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
DataSegment dataSegment1 = createDataSegment("ds1");
DataSegment dataSegment2 = createDataSegment("ds2");
DataSegment dataSegment3 = createDataSegment("ds3");
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(dataSegment1, dataSegment2, dataSegment3))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsInNodeLoadingQueue(2).build())
.build();
CoordinatorStats stats1 = rule.run(null, params, dataSegment1);
CoordinatorStats stats2 = rule.run(null, params, dataSegment2);
CoordinatorStats stats3 = rule.run(null, params, dataSegment3);
Assert.assertEquals(1L, stats1.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertEquals(1L, stats2.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertFalse(stats3.getTiers(LoadRule.ASSIGNED_COUNT).contains("hot"));
EasyMock.verify(throttler, mockBalancerStrategy);
}
private DataSegment createDataSegment(String dataSource)
{
return new DataSegment(
dataSource,
Intervals.of("0/3000"),
DateTimes.nowUtc().toString(),
Maps.newHashMap(),
Lists.newArrayList(),
Lists.newArrayList(),
NoneShardSpec.instance(),
0,
0
);
}
private static LoadRule createLoadRule(final Map<String, Integer> tieredReplicants)
{
return new LoadRule()
{
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
return tieredReplicants;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
return tieredReplicants.get(tier);
}
@Override
@ -577,73 +606,16 @@ public class LoadRuleTest
return true;
}
};
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
0
).toImmutableDruidServer(),
peon
)
)
)
)
);
ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);
DataSegment dataSegment1 = createDataSegment("ds1");
DataSegment dataSegment2 = createDataSegment("ds2");
DataSegment dataSegment3 = createDataSegment("ds3");
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams
.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(dataSegment1, dataSegment2, dataSegment3))
.withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsInNodeLoadingQueue(2).build())
.build();
CoordinatorStats stats1 = rule.run(null, params, dataSegment1);
CoordinatorStats stats2 = rule.run(null, params, dataSegment2);
CoordinatorStats stats3 = rule.run(null, params, dataSegment3);
Assert.assertEquals(1L, stats1.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertEquals(1L, stats2.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertEquals(0L, stats3.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
exec.shutdown();
}
private DataSegment createDataSegment(String dataSource)
private static LoadQueuePeon createEmptyPeon()
{
return new DataSegment(
dataSource,
Intervals.of("0/3000"),
DateTimes.nowUtc().toString(),
Maps.newHashMap(),
Lists.newArrayList(),
Lists.newArrayList(),
NoneShardSpec.instance(),
0,
0
);
final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
return mockPeon;
}
}