From 26fd2b3a8ebd16d47cb4291405cea1777a7ad0f2 Mon Sep 17 00:00:00 2001 From: Goh Wei Xiang Date: Thu, 28 Sep 2017 13:02:05 -0700 Subject: [PATCH] Priority on loading for primary replica (#4757) * Priority on loading for primary replica * Simplicity fixes * Fix on skipping drop for quick return. * change to debug logging for no replicants. * Fix on filter logic * swapping if-else * Fix on wrong "hasTier" logic * Refactoring of LoadRule * Rename createPredicate to createLoadQueueSizeLimitingPredicate * Rename getHolderList to getFilteredHolders * remove varargs * extract out currentReplicantsInTier * rename holders to holdersInTier * don't do temporary removal of tier. * rename primaryTier to tierToSkip * change LinkedList to ArrayList * Change MinMaxPriorityQueue in DruidCluster to TreeSet. * Adding some comments. * Modify log messages in light of predicates. * Add in-method comments * Don't create new Object2IntOpenHashMap for each run() call. * Cache result from strategy call in the primary assignment to be reused during the same run. * Spelling mistake * Cleaning up javadoc. * refactor out loading in progress check. * Removed redundant comment. * Removed forbidden API * Correct non-forbidden API. * Precision in variable type for NavigableSet. * Obsolete comment. * Clarity in method call and moving retrieval of ServerHolder into method call. * Comment on mutability of CoordinatoorStats. * Added auxiliary fixture for dropping. --- .../server/coordinator/DruidCluster.java | 23 +- .../coordinator/SegmentReplicantLookup.java | 4 +- .../server/coordinator/ServerHolder.java | 32 +- .../helper/DruidCoordinatorBalancer.java | 7 +- .../DruidCoordinatorCleanupOvershadowed.java | 5 +- .../DruidCoordinatorCleanupUnneeded.java | 5 +- .../helper/DruidCoordinatorLogger.java | 3 +- .../server/coordinator/rules/LoadRule.java | 460 +++++++----- .../server/coordinator/DruidClusterTest.java | 40 +- .../DruidCoordinatorBalancerProfiler.java | 51 +- .../DruidCoordinatorBalancerTest.java | 20 +- .../DruidCoordinatorRuleRunnerTest.java | 530 +++++++------- ...uidCoordinatorCleanupOvershadowedTest.java | 43 +- .../rules/BroadcastDistributionRuleTest.java | 30 +- .../coordinator/rules/LoadRuleTest.java | 656 +++++++++--------- 15 files changed, 1019 insertions(+), 890 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java index 014c42ba91c..b034c24bae5 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java @@ -20,19 +20,20 @@ package io.druid.server.coordinator; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Ordering; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.IAE; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; +import java.util.TreeSet; /** * Contains a representation of the current state of the cluster by tier. @@ -41,7 +42,7 @@ import java.util.Set; public class DruidCluster { private final Set realtimes; - private final Map> historicals; + private final Map> historicals; public DruidCluster() { @@ -52,7 +53,7 @@ public class DruidCluster @VisibleForTesting public DruidCluster( @Nullable Set realtimes, - Map> historicals + Map> historicals ) { this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); @@ -86,9 +87,9 @@ public class DruidCluster private void addHistorical(ServerHolder serverHolder) { final ImmutableDruidServer server = serverHolder.getServer(); - final MinMaxPriorityQueue tierServers = historicals.computeIfAbsent( + final NavigableSet tierServers = historicals.computeIfAbsent( server.getTier(), - k -> MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create() + k -> new TreeSet<>(Collections.reverseOrder()) ); tierServers.add(serverHolder); } @@ -98,7 +99,7 @@ public class DruidCluster return realtimes; } - public Map> getHistoricals() + public Map> getHistoricals() { return historicals; } @@ -108,7 +109,7 @@ public class DruidCluster return historicals.keySet(); } - public MinMaxPriorityQueue getHistoricalsByTier(String tier) + public NavigableSet getHistoricalsByTier(String tier) { return historicals.get(tier); } @@ -124,7 +125,7 @@ public class DruidCluster return allServers; } - public Iterable> getSortedHistoricalsByTier() + public Iterable> getSortedHistoricalsByTier() { return historicals.values(); } @@ -146,7 +147,7 @@ public class DruidCluster public boolean hasTier(String tier) { - MinMaxPriorityQueue servers = historicals.get(tier); - return (servers == null) || servers.isEmpty(); + NavigableSet servers = historicals.get(tier); + return (servers != null) && !servers.isEmpty(); } } diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index 785fce77fee..f23f1199e86 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -21,12 +21,12 @@ package io.druid.server.coordinator; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Table; import io.druid.client.ImmutableDruidServer; import io.druid.timeline.DataSegment; import java.util.Map; +import java.util.SortedSet; /** * A lookup for the number of replicants of a given segment for a certain tier. @@ -38,7 +38,7 @@ public class SegmentReplicantLookup final Table segmentsInCluster = HashBasedTable.create(); final Table loadingSegments = HashBasedTable.create(); - for (MinMaxPriorityQueue serversByType : cluster.getSortedHistoricalsByTier()) { + for (SortedSet serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java index ad4fd7379dc..ada6451a71e 100644 --- a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java @@ -19,6 +19,7 @@ package io.druid.server.coordinator; +import com.google.common.primitives.Longs; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.logger.Logger; import io.druid.timeline.DataSegment; @@ -52,32 +53,32 @@ public class ServerHolder implements Comparable return peon; } - public Long getMaxSize() + public long getMaxSize() { return server.getMaxSize(); } - public Long getCurrServerSize() + public long getCurrServerSize() { return server.getCurrSize(); } - public Long getLoadQueueSize() + public long getLoadQueueSize() { return peon.getLoadQueueSize(); } - public Long getSizeUsed() + public long getSizeUsed() { return getCurrServerSize() + getLoadQueueSize(); } - public Double getPercentUsed() + public double getPercentUsed() { - return (100 * getSizeUsed().doubleValue()) / getMaxSize(); + return (100.0 * getSizeUsed()) / getMaxSize(); } - public Long getAvailableSize() + public long getAvailableSize() { long maxSize = getMaxSize(); long sizeUsed = getSizeUsed(); @@ -114,7 +115,22 @@ public class ServerHolder implements Comparable @Override public int compareTo(ServerHolder serverHolder) { - return getAvailableSize().compareTo(serverHolder.getAvailableSize()); + int result = Longs.compare(getAvailableSize(), serverHolder.getAvailableSize()); + if (result != 0) { + return result; + } + + result = server.getHost().compareTo(serverHolder.server.getHost()); + if (result != 0) { + return result; + } + + result = server.getTier().compareTo(serverHolder.server.getTier()); + if (result != 0) { + return result; + } + + return server.getType().compareTo(serverHolder.server.getType()); } @Override diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 27360fcd749..0ec2aeb494b 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -20,7 +20,6 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.EmittingLogger; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.StringUtils; @@ -38,6 +37,8 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; +import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; /** @@ -78,7 +79,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { final CoordinatorStats stats = new CoordinatorStats(); - params.getDruidCluster().getHistoricals().forEach((String tier, MinMaxPriorityQueue servers) -> { + params.getDruidCluster().getHistoricals().forEach((String tier, NavigableSet servers) -> { balanceTier(params, tier, servers, stats); }); return params.buildFromExisting().withCoordinatorStats(stats).build(); @@ -87,7 +88,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper private void balanceTier( DruidCoordinatorRuntimeParams params, String tier, - MinMaxPriorityQueue servers, + SortedSet servers, CoordinatorStats stats ) { diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java index 2819a51ec4c..779c186ea03 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -20,8 +20,6 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; - import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.guava.Comparators; @@ -34,6 +32,7 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.VersionedIntervalTimeline; import java.util.Map; +import java.util.SortedSet; public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelper { @@ -55,7 +54,7 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp DruidCluster cluster = params.getDruidCluster(); Map> timelines = Maps.newHashMap(); - for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { + for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java index 89b4deb3175..5471f3116aa 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java @@ -19,8 +19,6 @@ package io.druid.server.coordinator.helper; -import com.google.common.collect.MinMaxPriorityQueue; - import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.logger.Logger; @@ -34,6 +32,7 @@ import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; import java.util.Set; +import java.util.SortedSet; /** */ @@ -64,7 +63,7 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper // This is done to prevent a race condition in which the coordinator would drop all segments if it started running // cleanup before it finished polling the metadata storage for available segments for the first time. if (!availableSegments.isEmpty()) { - for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { + for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java index 3820dfd0ee1..d295d47e493 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java @@ -19,7 +19,6 @@ package io.druid.server.coordinator.helper; -import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.DruidDataSource; @@ -171,7 +170,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper ); log.info("Load Queues:"); - for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { + for (Iterable serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); LoadQueuePeon queuePeon = serverHolder.getPeon(); diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index f14a4f3581e..2d6c163ced8 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -19,24 +19,26 @@ package io.druid.server.coordinator.rules; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.EmittingLogger; import io.druid.java.util.common.IAE; -import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; -import io.druid.server.coordinator.LoadPeonCallback; import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Objects; -import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -49,188 +51,326 @@ public abstract class LoadRule implements Rule static final String ASSIGNED_COUNT = "assignedCount"; static final String DROPPED_COUNT = "droppedCount"; + private final Object2IntMap targetReplicants = new Object2IntOpenHashMap<>(); + private final Object2IntMap currentReplicants = new Object2IntOpenHashMap<>(); + + // Cache to hold unused results from strategy call in assignPrimary + private final Map strategyCache = new HashMap<>(); + @Override - public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment) - { - final CoordinatorStats stats = new CoordinatorStats(); - final Set availableSegments = params.getAvailableSegments(); - - final Map loadStatus = Maps.newHashMap(); - - int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier()); - for (Map.Entry entry : getTieredReplicants().entrySet()) { - final String tier = entry.getKey(); - final int expectedReplicantsInTier = entry.getValue(); - final int totalReplicantsInTier = params.getSegmentReplicantLookup() - .getTotalReplicants(segment.getIdentifier(), tier); - final int loadedReplicantsInTier = params.getSegmentReplicantLookup() - .getLoadedReplicants(segment.getIdentifier(), tier); - - final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); - - if (serverQueue == null) { - log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); - continue; - } - - final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig() - .getMaxSegmentsInNodeLoadingQueue(); - - Predicate serverHolderPredicate; - if (maxSegmentsInNodeLoadingQueue > 0) { - serverHolderPredicate = s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue); - } else { - serverHolderPredicate = Objects::nonNull; - } - - final List serverHolderList = serverQueue.stream() - .filter(serverHolderPredicate) - .collect(Collectors.toList()); - - final BalancerStrategy strategy = params.getBalancerStrategy(); - if (availableSegments.contains(segment)) { - int assignedCount = assign( - params.getReplicationManager(), - tier, - totalReplicantsInCluster, - expectedReplicantsInTier, - totalReplicantsInTier, - strategy, - serverHolderList, - segment - ); - stats.addToTieredStat(ASSIGNED_COUNT, tier, assignedCount); - totalReplicantsInCluster += assignedCount; - } - - loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier); - } - // Remove over-replication - stats.accumulate(drop(loadStatus, segment, params)); - - - return stats; - } - - private int assign( - final ReplicationThrottler replicationManager, - final String tier, - final int totalReplicantsInCluster, - final int expectedReplicantsInTier, - final int totalReplicantsInTier, - final BalancerStrategy strategy, - final List serverHolderList, + public CoordinatorStats run( + final DruidCoordinator coordinator, + final DruidCoordinatorRuntimeParams params, final DataSegment segment ) { - int assignedCount = 0; - int currReplicantsInTier = totalReplicantsInTier; - int currTotalReplicantsInCluster = totalReplicantsInCluster; - while (currReplicantsInTier < expectedReplicantsInTier) { - boolean replicate = currTotalReplicantsInCluster > 0; + try { + // get the "snapshots" of targetReplicants and currentReplicants for assignments. + targetReplicants.putAll(getTieredReplicants()); + currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier())); - if (replicate && !replicationManager.canCreateReplicant(tier)) { - break; + final CoordinatorStats stats = new CoordinatorStats(); + + if (params.getAvailableSegments().contains(segment)) { + assign(params, segment, stats); } - final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); + drop(params, segment, stats); - if (holder == null) { - log.warn( - "Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", - tier, - segment.getIdentifier(), - expectedReplicantsInTier - ); - break; - } - - if (replicate) { - replicationManager.registerReplicantCreation( - tier, segment.getIdentifier(), holder.getServer().getHost() - ); - } - - holder.getPeon().loadSegment( - segment, - new LoadPeonCallback() - { - @Override - public void execute() - { - replicationManager.unregisterReplicantCreation( - tier, - segment.getIdentifier(), - holder.getServer().getHost() - ); - } - } - ); - - ++assignedCount; - ++currReplicantsInTier; - ++currTotalReplicantsInCluster; + return stats; + } + finally { + targetReplicants.clear(); + currentReplicants.clear(); + strategyCache.clear(); } - - return assignedCount; } - private CoordinatorStats drop( - final Map loadStatus, + /** + * @param stats {@link CoordinatorStats} to accumulate assignment statistics. + */ + private void assign( + final DruidCoordinatorRuntimeParams params, final DataSegment segment, + final CoordinatorStats stats + ) + { + // if primary replica already exists + if (!currentReplicants.isEmpty()) { + assignReplicas(params, segment, stats, null); + } else { + final ServerHolder primaryHolderToLoad = assignPrimary(params, segment); + if (primaryHolderToLoad == null) { + // cluster does not have any replicants and cannot identify primary holder + // this implies that no assignment could be done + return; + } + + int numAssigned = 1; // 1 replica (i.e., primary replica) already assigned + + final String tier = primaryHolderToLoad.getServer().getTier(); + // assign replicas for the rest of the tier + numAssigned += assignReplicasForTier( + tier, + targetReplicants.getOrDefault(tier, 0), + numAssigned, // note that the currentReplicantsInTier is the just-assigned primary replica. + params, + createLoadQueueSizeLimitingPredicate(params).and(holder -> !holder.equals(primaryHolderToLoad)), + segment + ); + stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); + + // do assign replicas for the other tiers. + assignReplicas(params, segment, stats, tier /* to skip */); + } + } + + private static Predicate createLoadQueueSizeLimitingPredicate( final DruidCoordinatorRuntimeParams params ) { - CoordinatorStats stats = new CoordinatorStats(); + final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); + if (maxSegmentsInNodeLoadingQueue <= 0) { + return Objects::nonNull; + } else { + return s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue); + } + } - // Make sure we have enough loaded replicants in the correct tiers in the cluster before doing anything - for (Integer leftToLoad : loadStatus.values()) { - if (leftToLoad > 0) { - return stats; - } + private static List getFilteredHolders( + final String tier, + final DruidCluster druidCluster, + final Predicate predicate + ) + { + final NavigableSet queue = druidCluster.getHistoricalsByTier(tier); + if (queue == null) { + log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); + return Collections.emptyList(); } - // Find all instances of this segment across tiers - Map replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier()); + return queue.stream().filter(predicate).collect(Collectors.toList()); + } - for (Map.Entry entry : replicantsByTier.entrySet()) { - final String tier = entry.getKey(); - int loadedNumReplicantsForTier = entry.getValue(); - int expectedNumReplicantsForTier = getNumReplicants(tier); - - stats.addToTieredStat(DROPPED_COUNT, tier, 0); - - MinMaxPriorityQueue serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); - if (serverQueue == null) { - log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit(); + /** + * Iterates through each tier and find the respective segment homes; with the found segment homes, selects the one + * with the highest priority to be the holder for the primary replica. + */ + @Nullable + private ServerHolder assignPrimary( + final DruidCoordinatorRuntimeParams params, + final DataSegment segment + ) + { + ServerHolder topCandidate = null; + for (final Object2IntMap.Entry entry : targetReplicants.object2IntEntrySet()) { + final int targetReplicantsInTier = entry.getIntValue(); + // sanity check: target number of replicants should be more than zero. + if (targetReplicantsInTier <= 0) { continue; } - List droppedServers = Lists.newArrayList(); - while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) { - final ServerHolder holder = serverQueue.pollLast(); - if (holder == null) { - log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); - break; - } + final String tier = entry.getKey(); - if (holder.isServingSegment(segment)) { - holder.getPeon().dropSegment( - segment, - null - ); - --loadedNumReplicantsForTier; - stats.addToTieredStat(DROPPED_COUNT, tier, 1); - } - droppedServers.add(holder); + final List holders = getFilteredHolders( + tier, + params.getDruidCluster(), + createLoadQueueSizeLimitingPredicate(params) + ); + // no holders satisfy the predicate + if (holders.isEmpty()) { + continue; + } + + final ServerHolder candidate = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders); + if (candidate == null) { + log.warn( + "No available [%s] servers or node capacity to assign primary segment[%s]! " + + "Expected Replicants[%d]", + tier, segment.getIdentifier(), targetReplicantsInTier + ); + } else { + // cache the result for later use. + strategyCache.put(tier, candidate); + if (topCandidate == null || + candidate.getServer().getPriority() > topCandidate.getServer().getPriority()) { + topCandidate = candidate; + } } - serverQueue.addAll(droppedServers); } - return stats; + if (topCandidate != null) { + // remove tier for primary replica + strategyCache.remove(topCandidate.getServer().getTier()); + topCandidate.getPeon().loadSegment(segment, null); + } + + return topCandidate; } - protected void validateTieredReplicants(Map tieredReplicants) + /** + * @param stats {@link CoordinatorStats} to accumulate assignment statistics. + * @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was + * assigned. + */ + private void assignReplicas( + final DruidCoordinatorRuntimeParams params, + final DataSegment segment, + final CoordinatorStats stats, + @Nullable final String tierToSkip + ) + { + for (final Object2IntMap.Entry entry : targetReplicants.object2IntEntrySet()) { + final String tier = entry.getKey(); + if (tier.equals(tierToSkip)) { + continue; + } + final int numAssigned = assignReplicasForTier( + tier, + entry.getIntValue(), + currentReplicants.getOrDefault(tier, 0), + params, + createLoadQueueSizeLimitingPredicate(params), + segment + ); + stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); + } + } + + /** + * @param predicate {@link Predicate} used to pre-filter {@link ServerHolder}s retrieved from {@link DruidCluster}. + */ + private int assignReplicasForTier( + final String tier, + final int targetReplicantsInTier, + final int currentReplicantsInTier, + final DruidCoordinatorRuntimeParams params, + final Predicate predicate, + final DataSegment segment + ) + { + final int numToAssign = targetReplicantsInTier - currentReplicantsInTier; + // if nothing to assign + if (numToAssign <= 0) { + return 0; + } + + final List holders = getFilteredHolders(tier, params.getDruidCluster(), predicate); + // if no holders available for assignment + if (holders.isEmpty()) { + return 0; + } + + final ReplicationThrottler throttler = params.getReplicationManager(); + for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) { + if (!throttler.canCreateReplicant(tier)) { + return numAssigned; + } + + // Retrieves from cache if available + ServerHolder holder = strategyCache.remove(tier); + // Does strategy call if not in cache + if (holder == null) { + holder = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders); + } + + if (holder == null) { + log.warn( + "No available [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", + tier, segment.getIdentifier(), targetReplicantsInTier + ); + return numAssigned; + } + holders.remove(holder); + + final String segmentId = segment.getIdentifier(); + final String holderHost = holder.getServer().getHost(); + throttler.registerReplicantCreation(tier, segmentId, holderHost); + holder.getPeon().loadSegment(segment, () -> throttler.unregisterReplicantCreation(tier, segmentId, holderHost)); + } + + return numToAssign; + } + + /** + * @param stats {@link CoordinatorStats} to accumulate assignment statistics. + */ + private void drop( + final DruidCoordinatorRuntimeParams params, + final DataSegment segment, + final CoordinatorStats stats + ) + { + final DruidCluster druidCluster = params.getDruidCluster(); + + // This enforces that loading is completed before we attempt to drop stuffs as a safety measure. + if (loadingInProgress(druidCluster)) { + return; + } + + for (final Object2IntMap.Entry entry : currentReplicants.object2IntEntrySet()) { + final String tier = entry.getKey(); + + final NavigableSet holders = druidCluster.getHistoricalsByTier(tier); + + final int numDropped; + if (holders == null) { + log.makeAlert("No holders found for tier[%s]", tier).emit(); + numDropped = 0; + } else { + final int currentReplicantsInTier = entry.getIntValue(); + final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0); + numDropped = dropForTier(numToDrop, holders, segment); + } + + stats.addToTieredStat(DROPPED_COUNT, tier, numDropped); + } + } + + /** + * Returns true if at least one tier in target replica assignment exists in cluster but does not have enough replicas. + */ + private boolean loadingInProgress(final DruidCluster druidCluster) + { + for (final Object2IntMap.Entry entry : targetReplicants.object2IntEntrySet()) { + final String tier = entry.getKey(); + // if there are replicants loading in cluster + if (druidCluster.hasTier(tier) && entry.getIntValue() > currentReplicants.getOrDefault(tier, 0)) { + return true; + } + } + + return false; + } + + private static int dropForTier( + final int numToDrop, + final NavigableSet holdersInTier, + final DataSegment segment + ) + { + int numDropped = 0; + + // Use the reverse order to get the holders with least available size first. + final Iterator iterator = holdersInTier.descendingIterator(); + while (numDropped < numToDrop) { + if (!iterator.hasNext()) { + log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); + break; + } + + final ServerHolder holder = iterator.next(); + + if (holder.isServingSegment(segment)) { + holder.getPeon().dropSegment(segment, null); + ++numDropped; + } + } + + return numDropped; + } + + protected static void validateTieredReplicants(final Map tieredReplicants) { if (tieredReplicants.size() == 0) { throw new IAE("A rule with empty tiered replicants is invalid"); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java b/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java index 18acacaa782..d705a70ce3e 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java @@ -22,8 +22,6 @@ package io.druid.server.coordinator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Ordering; import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.Intervals; @@ -36,10 +34,14 @@ import org.junit.Before; import org.junit.Test; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; +import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.stream.Stream; public class DruidClusterTest { @@ -142,25 +144,23 @@ public class DruidClusterTest ), ImmutableMap.of( "tier1", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - ImmutableList.of( - new ServerHolder( - new ImmutableDruidServer( - new DruidServerMetadata("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0), - 0L, - ImmutableMap.of( - "src1", - dataSources.get("src1") - ), - ImmutableMap.of( - "segment1", - segments.get(0) - ) + Stream.of( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") ), - new LoadQueuePeonTester() - ) + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); } @@ -186,7 +186,7 @@ public class DruidClusterTest cluster.add(newRealtime); cluster.add(newHistorical); final Set expectedRealtimes = cluster.getRealtimes(); - final Map> expectedHistoricals = cluster.getHistoricals(); + final Map> expectedHistoricals = cluster.getHistoricals(); final Collection allServers = cluster.getAllServers(); Assert.assertEquals(4, allServers.size()); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java index 8a67377377f..b49445905f4 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java @@ -24,13 +24,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DruidServer; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.DateTimes; import io.druid.metadata.MetadataRuleManager; +import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import io.druid.server.coordinator.rules.PeriodLoadRule; import io.druid.server.coordinator.rules.Rule; @@ -41,10 +41,12 @@ import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Before; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class DruidCoordinatorBalancerProfiler { @@ -135,12 +137,15 @@ public class DruidCoordinatorBalancerProfiler .withDruidCluster( new DruidCluster( null, - ImmutableMap.>of( + ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) - .create( - serverHolderList - ) + serverHolderList.stream().collect( + Collectors.toCollection( + () -> new TreeSet<>( + DruidCoordinatorBalancer.percentUsedComparator + ) + ) + ) ) ) ) @@ -163,12 +168,15 @@ public class DruidCoordinatorBalancerProfiler SegmentReplicantLookup.make( new DruidCluster( null, - ImmutableMap.>of( + ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) - .create( - serverHolderList - ) + serverHolderList.stream().collect( + Collectors.toCollection( + () -> new TreeSet<>( + DruidCoordinatorBalancer.percentUsedComparator + ) + ) + ) ) ) ) @@ -219,15 +227,18 @@ public class DruidCoordinatorBalancerProfiler .withDruidCluster( new DruidCluster( null, - ImmutableMap.>of( + ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) - .create( - Arrays.asList( - new ServerHolder(druidServer1, fromPeon), - new ServerHolder(druidServer2, toPeon) - ) - ) + Stream.of( + new ServerHolder(druidServer1, fromPeon), + new ServerHolder(druidServer2, toPeon) + ).collect( + Collectors.toCollection( + () -> new TreeSet<>( + DruidCoordinatorBalancer.percentUsedComparator + ) + ) + ) ) ) ) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index 66a041744ec..ec0b841fbcd 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -23,11 +23,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.DateTimes; +import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -43,6 +43,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -284,13 +285,14 @@ public class DruidCoordinatorBalancerTest null, ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) - .create( - IntStream - .range(0, druidServers.size()) - .mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i))) - .collect(Collectors.toList()) - ) + IntStream + .range(0, druidServers.size()) + .mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i))) + .collect( + Collectors.toCollection( + () -> new TreeSet<>(DruidCoordinatorBalancer.percentUsedComparator) + ) + ) ) ) ) @@ -319,7 +321,7 @@ public class DruidCoordinatorBalancerTest Map segments ) { - EasyMock.expect(druidServer.getName()).andReturn(name).atLeastOnce(); + EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes(); EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes(); EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce(); EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce(); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index a53924c7f47..7357c9cdf72 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -22,8 +22,6 @@ package io.druid.server.coordinator; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -52,10 +50,14 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** */ @@ -143,56 +145,50 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverNorm", - "hostNorm", - null, - 1000, - ServerType.HISTORICAL, - "normal", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverNorm", + "hostNorm", + null, + 1000, + ServerType.HISTORICAL, + "normal", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), "cold", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverCold", - "hostCold", - null, - 1000, - ServerType.HISTORICAL, - "cold", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverCold", + "hostCold", + null, + 1000, + ServerType.HISTORICAL, + "cold", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -258,51 +254,47 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ), - new ServerHolder( - new DruidServer( - "serverHot2", - "hostHot2", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer( + "serverHot2", + "hostHot2", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), "cold", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverCold", - "hostCold", - null, - 1000, - ServerType.HISTORICAL, - "cold", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverCold", + "hostCold", + null, + 1000, + ServerType.HISTORICAL, + "cold", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -379,31 +371,27 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - normServer.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + normServer.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -466,22 +454,20 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverNorm", - "hostNorm", - null, - 1000, - ServerType.HISTORICAL, - "normal", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverNorm", + "hostNorm", + null, + 1000, + ServerType.HISTORICAL, + "normal", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -524,28 +510,28 @@ public class DruidCoordinatorRuleRunnerTest ) ) ).atLeastOnce(); - EasyMock.replay(databaseRuleManager); + + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); + EasyMock.replay(databaseRuleManager, mockPeon); DruidCluster druidCluster = new DruidCluster( null, ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverNorm", - "hostNorm", - null, - 1000, - ServerType.HISTORICAL, - "normal", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverNorm", + "hostNorm", + null, + 1000, + ServerType.HISTORICAL, + "normal", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -560,7 +546,7 @@ public class DruidCoordinatorRuleRunnerTest ruleRunner.run(params); - EasyMock.verify(emitter); + EasyMock.verify(emitter, mockPeon); } @Test @@ -605,14 +591,12 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -689,18 +673,16 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server1.toImmutableDruidServer(), - mockPeon - ), - new ServerHolder( - server2.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -779,23 +761,19 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( + Stream.of( new ServerHolder( server1.toImmutableDruidServer(), mockPeon - ) ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server2.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -870,23 +848,19 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server1.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server2.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -976,22 +950,20 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server1.toImmutableDruidServer(), - mockPeon - ), - new ServerHolder( - server2.toImmutableDruidServer(), - anotherMockPeon - ), - new ServerHolder( - server3.toImmutableDruidServer(), - anotherMockPeon - ) + Stream.of( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + server2.toImmutableDruidServer(), + anotherMockPeon + ), + new ServerHolder( + server3.toImmutableDruidServer(), + anotherMockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -1050,34 +1022,32 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ), - new ServerHolder( - new DruidServer( - "serverHot2", - "hostHot2", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer( + "serverHot2", + "hostHot2", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -1178,39 +1148,35 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), DruidServer.DEFAULT_TIER, - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverNorm", - "hostNorm", - null, - 1000, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverNorm", + "hostNorm", + null, + 1000, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -1303,18 +1269,16 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( "normal", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server1.toImmutableDruidServer(), - mockPeon - ), - new ServerHolder( - server2.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -1388,22 +1352,20 @@ public class DruidCoordinatorRuleRunnerTest null, ImmutableMap.of( DruidServer.DEFAULT_TIER, - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java index cdcdf436d90..839d5b8ec43 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java @@ -22,11 +22,10 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Ordering; import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.DateTimes; +import io.druid.server.coordination.ServerType; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinator; @@ -41,6 +40,9 @@ import org.junit.Test; import java.util.Collections; import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class DruidCoordinatorCleanupOvershadowedTest { @@ -71,11 +73,28 @@ public class DruidCoordinatorCleanupOvershadowedTest druidCoordinatorCleanupOvershadowed = new DruidCoordinatorCleanupOvershadowed(coordinator); availableSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2); - druidCluster = new DruidCluster( - null, - ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Collections.singletonList(new ServerHolder(druidServer, mockPeon)) - ))); + // Dummy values for comparisons in TreeSet + EasyMock.expect(mockPeon.getLoadQueueSize()) + .andReturn(0L) + .anyTimes(); + EasyMock.expect(druidServer.getMaxSize()) + .andReturn(0L) + .anyTimes(); + EasyMock.expect(druidServer.getCurrSize()) + .andReturn(0L) + .anyTimes(); + EasyMock.expect(druidServer.getName()) + .andReturn("") + .anyTimes(); + EasyMock.expect(druidServer.getHost()) + .andReturn("") + .anyTimes(); + EasyMock.expect(druidServer.getTier()) + .andReturn("") + .anyTimes(); + EasyMock.expect(druidServer.getType()) + .andReturn(ServerType.HISTORICAL) + .anyTimes(); EasyMock.expect(druidServer.getDataSources()) .andReturn(ImmutableList.of(druidDataSource)) @@ -88,6 +107,16 @@ public class DruidCoordinatorCleanupOvershadowedTest coordinator.removeSegment(segmentV0); EasyMock.expectLastCall(); EasyMock.replay(mockPeon, coordinator, druidServer, druidDataSource); + + druidCluster = new DruidCluster( + null, + ImmutableMap.of( + "normal", + Stream.of( + new ServerHolder(druidServer, mockPeon) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + )); + DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() .withAvailableSegments(availableSegments) .withCoordinatorStats(new CoordinatorStats()) diff --git a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index f4719a141b9..4da23cdd801 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -23,8 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Ordering; import io.druid.client.DruidServer; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; @@ -40,7 +38,11 @@ import io.druid.timeline.partition.NoneShardSpec; import org.junit.Before; import org.junit.Test; +import java.util.Collections; import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -198,21 +200,17 @@ public class BroadcastDistributionRuleTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Lists.newArrayList( - holdersOfLargeSegments.get(0), - holderOfSmallSegment, - holdersOfLargeSegments2.get(0) - ) - ), + Stream.of( + holdersOfLargeSegments.get(0), + holderOfSmallSegment, + holdersOfLargeSegments2.get(0) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), DruidServer.DEFAULT_TIER, - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Lists.newArrayList( - holdersOfLargeSegments.get(1), - holdersOfLargeSegments.get(2), - holdersOfLargeSegments2.get(1) - ) - ) + Stream.of( + holdersOfLargeSegments.get(1), + holdersOfLargeSegments.get(2), + holdersOfLargeSegments2.get(1) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); } diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index f62574c7ec0..d57808a6d61 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.MinMaxPriorityQueue; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -61,8 +59,12 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.Map; +import java.util.TreeSet; import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** */ @@ -81,190 +83,216 @@ public class LoadRuleTest ) ); - private LoadQueuePeon mockPeon; private ReplicationThrottler throttler; - private DataSegment segment; + private ListeningExecutorService exec; + private BalancerStrategy balancerStrategy; + + private BalancerStrategy mockBalancerStrategy; @Before public void setUp() throws Exception { EmittingLogger.registerEmitter(emitter); emitter.start(); - mockPeon = EasyMock.createMock(LoadQueuePeon.class); - throttler = new ReplicationThrottler(2, 1); - for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) { - throttler.updateReplicationState(tier); - } - segment = createDataSegment("foo"); + throttler = EasyMock.createMock(ReplicationThrottler.class); + + exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); + + mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class); } @After public void tearDown() throws Exception { - EasyMock.verify(mockPeon); + exec.shutdown(); emitter.close(); } @Test public void testLoad() throws Exception { + EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes(); + + final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); - EasyMock.replay(mockPeon); - LoadRule rule = new LoadRule() - { - private final Map tiers = ImmutableMap.of( - "hot", 1, - DruidServer.DEFAULT_TIER, 2 - ); + LoadRule rule = createLoadRule(ImmutableMap.of( + "hot", 1, + DruidServer.DEFAULT_TIER, 2 + )); - @Override - public Map getTieredReplicants() - { - return tiers; - } + final DataSegment segment = createDataSegment("foo"); - @Override - public int getNumReplicants(String tier) - { - return tiers.get(tier); - } + throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER, segment.getIdentifier(), "hostNorm"); + EasyMock.expectLastCall().once(); - @Override - public String getType() - { - return "test"; - } + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(3); - @Override - public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) - { - return true; - } - - @Override - public boolean appliesTo(Interval interval, DateTime referenceTimestamp) - { - return true; - } - }; + EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); DruidCluster druidCluster = new DruidCluster( null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), DruidServer.DEFAULT_TIER, - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverNorm", - "hostNorm", - null, - 1000, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverNorm", + "hostNorm", + null, + 1000, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); - CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategy(balancerStrategy) + .withBalancerStrategy(mockBalancerStrategy) .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); - Assert.assertEquals(2L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, DruidServer.DEFAULT_TIER)); - exec.shutdown(); + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, DruidServer.DEFAULT_TIER)); + + EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); + } + + @Test + public void testLoadPriority() throws Exception + { + EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(false).anyTimes(); + + final LoadQueuePeon mockPeon1 = createEmptyPeon(); + final LoadQueuePeon mockPeon2 = createEmptyPeon(); + + mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.isNull()); + EasyMock.expectLastCall().once(); + + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(2); + + EasyMock.replay(throttler, mockPeon1, mockPeon2, mockBalancerStrategy); + + final LoadRule rule = createLoadRule(ImmutableMap.of( + "tier1", 10, + "tier2", 10 + )); + + final DruidCluster druidCluster = new DruidCluster( + null, + ImmutableMap.of( + "tier1", + Stream.of( + new ServerHolder( + new DruidServer( + "server1", + "host1", + null, + 1000, + ServerType.HISTORICAL, + "tier1", + 0 + ).toImmutableDruidServer(), + mockPeon1 + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), + "tier2", + Stream.of( + new ServerHolder( + new DruidServer( + "server2", + "host2", + null, + 1000, + ServerType.HISTORICAL, + "tier2", + 1 + ).toImmutableDruidServer(), + mockPeon2 + ), + new ServerHolder( + new DruidServer( + "server3", + "host3", + null, + 1000, + ServerType.HISTORICAL, + "tier2", + 1 + ).toImmutableDruidServer(), + mockPeon2 + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ) + ); + + final DataSegment segment = createDataSegment("foo"); + + final CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withReplicationManager(throttler) + .withBalancerStrategy(mockBalancerStrategy) + .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) + .withAvailableSegments(Collections.singletonList(segment)).build(), + segment + ); + + Assert.assertEquals(0L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "tier1")); + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "tier2")); + + EasyMock.verify(throttler, mockPeon1, mockPeon2, mockBalancerStrategy); } @Test public void testDrop() throws Exception { + final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); - EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); - EasyMock.replay(mockPeon); + EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); - LoadRule rule = new LoadRule() - { - private final Map tiers = ImmutableMap.of( - "hot", 0, - DruidServer.DEFAULT_TIER, 0 - ); + LoadRule rule = createLoadRule(ImmutableMap.of( + "hot", 0, + DruidServer.DEFAULT_TIER, 0 + )); - @Override - public Map getTieredReplicants() - { - return tiers; - } - - @Override - public int getNumReplicants(String tier) - { - return tiers.get(tier); - } - - @Override - public String getType() - { - return "test"; - } - - @Override - public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) - { - return true; - } - - @Override - public boolean appliesTo(Interval interval, DateTime referenceTimestamp) - { - return true; - } - }; + final DataSegment segment = createDataSegment("foo"); DruidServer server1 = new DruidServer( "serverHot", @@ -286,42 +314,46 @@ public class LoadRuleTest 0 ); server2.addDataSegment(segment.getIdentifier(), segment); + DruidServer server3 = new DruidServer( + "serverNormNotServing", + "hostNorm", + null, + 10, + ServerType.HISTORICAL, + DruidServer.DEFAULT_TIER, + 0 + ); DruidCluster druidCluster = new DruidCluster( null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server1.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon ) - ), + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))), DruidServer.DEFAULT_TIER, - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server2.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + server3.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); - CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategy(balancerStrategy) + .withBalancerStrategy(mockBalancerStrategy) .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment @@ -329,85 +361,50 @@ public class LoadRuleTest Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot")); Assert.assertEquals(1L, stats.getTieredStat("droppedCount", DruidServer.DEFAULT_TIER)); - exec.shutdown(); + + EasyMock.verify(throttler, mockPeon); } @Test public void testLoadWithNonExistentTier() throws Exception { + final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); - EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); - EasyMock.replay(mockPeon); - LoadRule rule = new LoadRule() - { - private final Map tiers = ImmutableMap.of( - "nonExistentTier", 1, - "hot", 1 - ); + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(1); - @Override - public Map getTieredReplicants() - { - return tiers; - } + EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); - @Override - public int getNumReplicants(String tier) - { - return tiers.get(tier); - } - - @Override - public String getType() - { - return "test"; - } - - @Override - public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) - { - return true; - } - - @Override - public boolean appliesTo(Interval interval, DateTime referenceTimestamp) - { - return true; - } - }; + LoadRule rule = createLoadRule(ImmutableMap.of( + "nonExistentTier", 1, + "hot", 1 + )); DruidCluster druidCluster = new DruidCluster( null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); + final DataSegment segment = createDataSegment("foo"); CoordinatorStats stats = rule.run( null, @@ -415,64 +412,32 @@ public class LoadRuleTest .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .withReplicationManager(throttler) - .withBalancerStrategy(balancerStrategy) + .withBalancerStrategy(mockBalancerStrategy) .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); - exec.shutdown(); + + EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); } @Test public void testDropWithNonExistentTier() throws Exception { + final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); - EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); - EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); - EasyMock.replay(mockPeon); - LoadRule rule = new LoadRule() - { - private final Map tiers = ImmutableMap.of( - "nonExistentTier", 1, - "hot", 1 - ); + EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); - @Override - public Map getTieredReplicants() - { - return tiers; - } + LoadRule rule = createLoadRule(ImmutableMap.of( + "nonExistentTier", 1, + "hot", 1 + )); - @Override - public int getNumReplicants(String tier) - { - return tiers.get(tier); - } - - @Override - public String getType() - { - return "test"; - } - - @Override - public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) - { - return true; - } - - @Override - public boolean appliesTo(Interval interval, DateTime referenceTimestamp) - { - return true; - } - }; + final DataSegment segment = createDataSegment("foo"); DruidServer server1 = new DruidServer( "serverHot", @@ -499,64 +464,128 @@ public class LoadRuleTest null, ImmutableMap.of( "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - server1.toImmutableDruidServer(), - mockPeon - ), - new ServerHolder( - server2.toImmutableDruidServer(), - mockPeon - ) + Stream.of( + new ServerHolder( + server1.toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + server2.toImmutableDruidServer(), + mockPeon ) - ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); - CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategy(balancerStrategy) + .withBalancerStrategy(mockBalancerStrategy) .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot")); - exec.shutdown(); + + EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); } @Test public void testMaxLoadingQueueSize() throws Exception { - EasyMock.replay(mockPeon); - LoadQueuePeonTester peon = new LoadQueuePeonTester(); + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(2); - LoadRule rule = new LoadRule() + EasyMock.replay(throttler, mockBalancerStrategy); + + final LoadQueuePeonTester peon = new LoadQueuePeonTester(); + + LoadRule rule = createLoadRule(ImmutableMap.of( + "hot", 1 + )); + + DruidCluster druidCluster = new DruidCluster( + null, + ImmutableMap.of( + "hot", + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 0 + ).toImmutableDruidServer(), + peon + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ) + ); + + DataSegment dataSegment1 = createDataSegment("ds1"); + DataSegment dataSegment2 = createDataSegment("ds2"); + DataSegment dataSegment3 = createDataSegment("ds3"); + + DruidCoordinatorRuntimeParams params = + DruidCoordinatorRuntimeParams + .newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withReplicationManager(throttler) + .withBalancerStrategy(mockBalancerStrategy) + .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) + .withAvailableSegments(Arrays.asList(dataSegment1, dataSegment2, dataSegment3)) + .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsInNodeLoadingQueue(2).build()) + .build(); + + CoordinatorStats stats1 = rule.run(null, params, dataSegment1); + CoordinatorStats stats2 = rule.run(null, params, dataSegment2); + CoordinatorStats stats3 = rule.run(null, params, dataSegment3); + + Assert.assertEquals(1L, stats1.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + Assert.assertEquals(1L, stats2.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + Assert.assertFalse(stats3.getTiers(LoadRule.ASSIGNED_COUNT).contains("hot")); + + EasyMock.verify(throttler, mockBalancerStrategy); + } + + private DataSegment createDataSegment(String dataSource) + { + return new DataSegment( + dataSource, + Intervals.of("0/3000"), + DateTimes.nowUtc().toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + 0, + 0 + ); + } + + private static LoadRule createLoadRule(final Map tieredReplicants) + { + return new LoadRule() { - private final Map tiers = ImmutableMap.of( - "hot", 1 - ); - @Override public Map getTieredReplicants() { - return tiers; + return tieredReplicants; } @Override public int getNumReplicants(String tier) { - return tiers.get(tier); + return tieredReplicants.get(tier); } @Override @@ -577,73 +606,16 @@ public class LoadRuleTest return true; } }; - - DruidCluster druidCluster = new DruidCluster( - null, - ImmutableMap.of( - "hot", - MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( - Arrays.asList( - new ServerHolder( - new DruidServer( - "serverHot", - "hostHot", - null, - 1000, - ServerType.HISTORICAL, - "hot", - 0 - ).toImmutableDruidServer(), - peon - ) - ) - ) - ) - ); - - ListeningExecutorService exec = MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool(1)); - BalancerStrategy balancerStrategy = - new CostBalancerStrategyFactory().createBalancerStrategy(exec); - - DataSegment dataSegment1 = createDataSegment("ds1"); - DataSegment dataSegment2 = createDataSegment("ds2"); - DataSegment dataSegment3 = createDataSegment("ds3"); - - DruidCoordinatorRuntimeParams params = - DruidCoordinatorRuntimeParams - .newBuilder() - .withDruidCluster(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) - .withReplicationManager(throttler) - .withBalancerStrategy(balancerStrategy) - .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) - .withAvailableSegments(Arrays.asList(dataSegment1, dataSegment2, dataSegment3)) - .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsInNodeLoadingQueue(2).build()) - .build(); - - CoordinatorStats stats1 = rule.run(null, params, dataSegment1); - CoordinatorStats stats2 = rule.run(null, params, dataSegment2); - CoordinatorStats stats3 = rule.run(null, params, dataSegment3); - - Assert.assertEquals(1L, stats1.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); - Assert.assertEquals(1L, stats2.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); - Assert.assertEquals(0L, stats3.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); - exec.shutdown(); } - private DataSegment createDataSegment(String dataSource) + private static LoadQueuePeon createEmptyPeon() { - return new DataSegment( - dataSource, - Intervals.of("0/3000"), - DateTimes.nowUtc().toString(), - Maps.newHashMap(), - Lists.newArrayList(), - Lists.newArrayList(), - NoneShardSpec.instance(), - 0, - 0 - ); + final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); + + return mockPeon; } }