From 9376d8d6e1c952a57aef1aa6e863547f95c338a7 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 22 Aug 2023 19:50:41 +0530 Subject: [PATCH] Refactor: Move `UpdateCoordinatorStateAndPrepareCluster` duty out of `DruidCoordinator` (#14845) Motivation: - Clean up `DruidCoordinator` and move methods to classes where they are most relevant Changes: - No functional change - Add duty `PrepareBalancerAndLoadQueues` to replace `UpdateCoordinatorState` - Move map of `LoadQueuePeon` from `DruidCoordinator` to `LoadQueueTaskMaster` - Make `BalancerStrategyFactory` an abstract class and keep the balancer executor here - Move reporting of used segment stats and historical capacity stats from `CollectSegmentAndServerStats` to `PrepareBalancerAndLoadQueues` - Move reporting of unavailable and under-replicated segment stats from `CollectSegmentAndServerStats` to `UpdateReplicationStatus` duty --- .../server/coordinator/DruidCoordinator.java | 253 +++--------------- .../DruidCoordinatorRuntimeParams.java | 2 +- .../balancer/BalancerStrategyFactory.java | 45 +++- .../CachingCostBalancerStrategyFactory.java | 5 +- .../balancer/CostBalancerStrategyFactory.java | 8 +- ...ledCachingCostBalancerStrategyFactory.java | 7 +- ...NormalizedCostBalancerStrategyFactory.java | 8 +- .../RandomBalancerStrategyFactory.java | 6 +- .../duty/CollectSegmentAndServerStats.java | 70 ++--- .../duty/PrepareBalancerAndLoadQueues.java | 196 ++++++++++++++ .../loading/LoadQueueTaskMaster.java | 80 +++++- .../coordinator/DruidCoordinatorTest.java | 71 +---- .../balancer/BalancerStrategyFactoryTest.java | 45 ++-- .../coordinator/duty/BalanceSegmentsTest.java | 4 +- .../CollectSegmentAndServerStatsTest.java | 22 +- .../coordinator/duty/CompactSegmentsTest.java | 2 +- .../MarkOvershadowedSegmentsAsUnusedTest.java | 2 +- 17 files changed, 437 insertions(+), 389 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 832f7790adc..faa399ba737 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -23,8 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import it.unimi.dsi.fastutil.objects.Object2IntMap; @@ -35,7 +33,6 @@ import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; -import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.common.config.JacksonConfigManager; @@ -47,7 +44,6 @@ import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Stopwatch; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.guava.Comparators; @@ -60,7 +56,6 @@ import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.DruidNode; -import org.apache.druid.server.coordinator.balancer.BalancerStrategy; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy; import org.apache.druid.server.coordinator.duty.BalanceSegments; @@ -70,12 +65,12 @@ import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused; +import org.apache.druid.server.coordinator.duty.PrepareBalancerAndLoadQueues; import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; -import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig; import org.apache.druid.server.coordinator.loading.SegmentReplicaCount; import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; @@ -98,7 +93,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -144,7 +138,6 @@ public class DruidCoordinator private final ScheduledExecutorFactory executorFactory; private final Map dutyGroupExecutors = new HashMap<>(); private final LoadQueueTaskMaster taskMaster; - private final ConcurrentHashMap loadManagementPeons = new ConcurrentHashMap<>(); private final SegmentLoadQueueManager loadQueueManager; private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; @@ -171,9 +164,6 @@ public class DruidCoordinator */ private volatile SegmentReplicationStatus segmentReplicationStatus = null; - private int cachedBalancerThreadNumber; - private ListeningExecutorService balancerExec; - public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties"; private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties"; private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties"; @@ -233,7 +223,7 @@ public class DruidCoordinator public Map getLoadManagementPeons() { - return loadManagementPeons; + return taskMaster.getAllPeons(); } public Map> getTierToDatasourceToUnderReplicatedCount(boolean useClusterView) @@ -344,18 +334,6 @@ public class DruidCoordinator return coordLeaderSelector.getCurrentLeader(); } - @VisibleForTesting - public int getCachedBalancerThreadNumber() - { - return cachedBalancerThreadNumber; - } - - @VisibleForTesting - public ListeningExecutorService getBalancerExec() - { - return balancerExec; - } - @LifecycleStart public void start() { @@ -397,10 +375,7 @@ public class DruidCoordinator started = false; stopAllDutyGroupExecutors(); - - if (balancerExec != null) { - balancerExec.shutdownNow(); - } + balancerStrategyFactory.stopExecutor(); } } @@ -440,6 +415,7 @@ public class DruidCoordinator config.getCoordinatorStartDelay() ); + taskMaster.onLeaderStart(); segmentsMetadataManager.startPollingDatabasePeriodically(); segmentsMetadataManager.populateUsedFlagLastUpdatedAsync(); metadataRuleManager.start(); @@ -524,22 +500,13 @@ public class DruidCoordinator log.info("I am no longer the leader..."); - for (String server : loadManagementPeons.keySet()) { - LoadQueuePeon peon = loadManagementPeons.remove(server); - peon.stop(); - } - loadManagementPeons.clear(); - + taskMaster.onLeaderStop(); serviceAnnouncer.unannounce(self); lookupCoordinatorManager.stop(); metadataRuleManager.stop(); segmentsMetadataManager.stopPollingDatabasePeriodically(); segmentsMetadataManager.stopAsyncUsedFlagLastUpdatedUpdate(); - - if (balancerExec != null) { - balancerExec.shutdownNow(); - balancerExec = null; - } + balancerStrategyFactory.stopExecutor(); } } @@ -559,53 +526,21 @@ public class DruidCoordinator dutyGroupExecutors.clear(); } - /** - * Resets the balancerExec if required and creates a new BalancerStrategy for - * the current coordinator run. - */ - @VisibleForTesting - BalancerStrategy createBalancerStrategy(int balancerComputeThreads) - { - // Reset balancerExecutor if required - if (balancerExec == null) { - balancerExec = createNewBalancerExecutor(balancerComputeThreads); - } else if (cachedBalancerThreadNumber != balancerComputeThreads) { - log.info( - "'balancerComputeThreads' has changed from [%d] to [%d]", - cachedBalancerThreadNumber, balancerComputeThreads - ); - balancerExec.shutdownNow(); - balancerExec = createNewBalancerExecutor(balancerComputeThreads); - } - - // Create BalancerStrategy - final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec); - log.info( - "Using balancer strategy[%s] with [%d] threads.", - balancerStrategy.getClass().getSimpleName(), balancerComputeThreads - ); - return balancerStrategy; - } - - private ListeningExecutorService createNewBalancerExecutor(int numThreads) - { - log.info("Creating new balancer executor with [%d] threads.", numThreads); - cachedBalancerThreadNumber = numThreads; - return MoreExecutors.listeningDecorator( - Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s") - ); - } - private List makeHistoricalManagementDuties() { return ImmutableList.of( - new UpdateCoordinatorStateAndPrepareCluster(), + new PrepareBalancerAndLoadQueues( + taskMaster, + loadQueueManager, + balancerStrategyFactory, + serverInventoryView + ), new RunRules(segmentsMetadataManager::markSegmentsAsUnused), new UpdateReplicationStatus(), new UnloadUnusedSegments(loadQueueManager), new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused), new BalanceSegments(config.getCoordinatorPeriod()), - new CollectSegmentAndServerStats(DruidCoordinator.this) + new CollectSegmentAndServerStats(taskMaster) ); } @@ -724,7 +659,7 @@ public class DruidCoordinator DruidCoordinatorRuntimeParams .newBuilder(coordinatorStartTime) .withDatabaseRuleManager(metadataRuleManager) - .withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot) + .withDataSourcesSnapshot(dataSourcesSnapshot) .withDynamicConfigs(getDynamicConfigs()) .withCompactionConfig(getCompactionConfig()) .build(); @@ -816,149 +751,6 @@ public class DruidCoordinator } } - /** - * This duty does the following: - *
    - *
  • Prepares an immutable {@link DruidCluster} consisting of {@link ServerHolder}s - * which represent the current state of the servers in the cluster.
  • - *
  • Starts and stops load peons for new and disappeared servers respectively.
  • - *
  • Cancels in-progress loads on all decommissioning servers. This is done - * here to ensure that under-replicated segments are assigned to active servers - * in the {@link RunRules} duty after this.
  • - *
  • Initializes the {@link BalancerStrategy} for the run.
  • - *
- * - * @see #makeHistoricalManagementDuties() for the order of duties - */ - private class UpdateCoordinatorStateAndPrepareCluster implements CoordinatorDuty - { - @Nullable - @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) - { - List currentServers = prepareCurrentServers(); - - startPeonsForNewServers(currentServers); - stopPeonsForDisappearedServers(currentServers); - - final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); - final SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig(); - - final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); - cancelLoadsOnDecommissioningServers(cluster); - - final BalancerStrategy balancerStrategy - = createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads()); - return params.buildFromExisting() - .withDruidCluster(cluster) - .withBalancerStrategy(balancerStrategy) - .withSegmentAssignerUsing(loadQueueManager) - .build(); - } - - /** - * Cancels all load/move operations on decommissioning servers. This should - * be done before initializing the SegmentReplicantLookup so that - * under-replicated segments can be assigned in the current run itself. - */ - private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) - { - final AtomicInteger cancelledCount = new AtomicInteger(0); - final List decommissioningServers - = cluster.getAllServers().stream() - .filter(ServerHolder::isDecommissioning) - .collect(Collectors.toList()); - - for (ServerHolder server : decommissioningServers) { - server.getQueuedSegments().forEach( - (segment, action) -> { - // Cancel the operation if it is a type of load - if (action.isLoad() && server.cancelOperation(action, segment)) { - cancelledCount.incrementAndGet(); - } - } - ); - } - - if (cancelledCount.get() > 0) { - log.info( - "Cancelled [%d] load/move operations on [%d] decommissioning servers.", - cancelledCount.get(), decommissioningServers.size() - ); - } - } - - List prepareCurrentServers() - { - List currentServers = serverInventoryView - .getInventory() - .stream() - .filter(DruidServer::isSegmentReplicationOrBroadcastTarget) - .map(DruidServer::toImmutableDruidServer) - .collect(Collectors.toList()); - - if (log.isDebugEnabled()) { - // Display info about all segment-replicatable (historical and bridge) servers - log.debug("Servers"); - for (ImmutableDruidServer druidServer : currentServers) { - log.debug(" %s", druidServer); - log.debug(" -- DataSources"); - for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { - log.debug(" %s", druidDataSource); - } - } - } - return currentServers; - } - - void startPeonsForNewServers(List currentServers) - { - for (ImmutableDruidServer server : currentServers) { - loadManagementPeons.computeIfAbsent(server.getName(), serverName -> { - LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); - loadQueuePeon.start(); - log.debug("Created LoadQueuePeon for server[%s].", server.getName()); - return loadQueuePeon; - }); - } - } - - DruidCluster prepareCluster( - CoordinatorDynamicConfig dynamicConfig, - SegmentLoadingConfig segmentLoadingConfig, - List currentServers - ) - { - final Set decommissioningServers = dynamicConfig.getDecommissioningNodes(); - final DruidCluster.Builder cluster = DruidCluster.builder(); - for (ImmutableDruidServer server : currentServers) { - cluster.add( - new ServerHolder( - server, - loadManagementPeons.get(server.getName()), - decommissioningServers.contains(server.getHost()), - segmentLoadingConfig.getMaxSegmentsInLoadQueue(), - segmentLoadingConfig.getMaxLifetimeInLoadQueue() - ) - ); - } - return cluster.build(); - } - - void stopPeonsForDisappearedServers(List servers) - { - final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); - for (ImmutableDruidServer server : servers) { - disappeared.remove(server.getName()); - } - for (String name : disappeared) { - log.debug("Removing listener for server[%s] which is no longer there.", name); - LoadQueuePeon peon = loadManagementPeons.remove(name); - peon.stop(); - } - } - } - /** * Updates replication status of all used segments. This duty must run after * {@link RunRules} so that the number of required replicas for all segments @@ -971,6 +763,23 @@ public class DruidCoordinator public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { segmentReplicationStatus = params.getSegmentReplicationStatus(); + + // Collect stats for unavailable and under-replicated segments + final CoordinatorRunStats stats = params.getCoordinatorStats(); + getDatasourceToUnavailableSegmentCount().forEach( + (dataSource, numUnavailable) -> stats.add( + Stats.Segments.UNAVAILABLE, + RowKey.of(Dimension.DATASOURCE, dataSource), + numUnavailable + ) + ); + getTierToDatasourceToUnderReplicatedCount(false).forEach( + (tier, countsPerDatasource) -> countsPerDatasource.forEach( + (dataSource, underReplicatedCount) -> + stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, tier, dataSource, underReplicatedCount) + ) + ); + return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 79173683fa4..07a7bf1b2ef 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -332,7 +332,7 @@ public class DruidCoordinatorRuntimeParams return this; } - public Builder withSnapshotOfDataSourcesWithAllUsedSegments(DataSourcesSnapshot snapshot) + public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot) { this.usedSegments = createUsedSegmentsSet(snapshot.iterateAllUsedSegmentsInSnapshot()); this.dataSourcesSnapshot = snapshot; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java index d0d426f7b51..0dd05cb1b5b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java @@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator.balancer; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class) @JsonSubTypes(value = { @@ -30,7 +33,45 @@ import com.google.common.util.concurrent.ListeningExecutorService; @JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class), @JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class) }) -public interface BalancerStrategyFactory +public abstract class BalancerStrategyFactory { - BalancerStrategy createBalancerStrategy(ListeningExecutorService exec); + private static final Logger log = new Logger(BalancerStrategyFactory.class); + + public abstract BalancerStrategy createBalancerStrategy(int numBalancerThreads); + + private int cachedBalancerThreadNumber; + private ListeningExecutorService balancerExec; + + public void stopExecutor() + { + if (balancerExec != null) { + balancerExec.shutdownNow(); + balancerExec = null; + } + } + + protected ListeningExecutorService getOrCreateBalancerExecutor(int balancerComputeThreads) + { + if (balancerExec == null) { + balancerExec = createNewBalancerExecutor(balancerComputeThreads); + } else if (cachedBalancerThreadNumber != balancerComputeThreads) { + log.info( + "'balancerComputeThreads' has changed from [%d] to [%d].", + cachedBalancerThreadNumber, balancerComputeThreads + ); + balancerExec.shutdownNow(); + balancerExec = createNewBalancerExecutor(balancerComputeThreads); + } + + return balancerExec; + } + + private ListeningExecutorService createNewBalancerExecutor(int numThreads) + { + log.info("Creating new balancer executor with [%d] threads.", numThreads); + cachedBalancerThreadNumber = numThreads; + return MoreExecutors.listeningDecorator( + Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s") + ); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java index 89b7cf75463..8c78bff7b68 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java @@ -44,7 +44,7 @@ import java.util.concurrent.RejectedExecutionException; * and will be removed in future releases. */ @Deprecated -public class CachingCostBalancerStrategyFactory implements BalancerStrategyFactory +public class CachingCostBalancerStrategyFactory extends BalancerStrategyFactory { private static final EmittingLogger LOG = new EmittingLogger(CachingCostBalancerStrategyFactory.class); @@ -128,8 +128,9 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto } @Override - public BalancerStrategy createBalancerStrategy(final ListeningExecutorService exec) + public BalancerStrategy createBalancerStrategy(int numBalancerThreads) { + final ListeningExecutorService exec = getOrCreateBalancerExecutor(numBalancerThreads); LOG.warn( "'cachingCost' balancer strategy has been deprecated as it can lead to" + " unbalanced clusters. Use 'cost' strategy instead." diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java index 10d5952390a..6894123c57c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java @@ -19,13 +19,11 @@ package org.apache.druid.server.coordinator.balancer; -import com.google.common.util.concurrent.ListeningExecutorService; - -public class CostBalancerStrategyFactory implements BalancerStrategyFactory +public class CostBalancerStrategyFactory extends BalancerStrategyFactory { @Override - public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) + public CostBalancerStrategy createBalancerStrategy(int numBalancerThreads) { - return new CostBalancerStrategy(exec); + return new CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads)); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java index 7d2f0d96bc6..d1881d477c5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java @@ -19,17 +19,16 @@ package org.apache.druid.server.coordinator.balancer; -import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.java.util.common.logger.Logger; -public class DisabledCachingCostBalancerStrategyFactory implements BalancerStrategyFactory +public class DisabledCachingCostBalancerStrategyFactory extends BalancerStrategyFactory { private static final Logger log = new Logger(BalancerStrategyFactory.class); @Override - public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) + public BalancerStrategy createBalancerStrategy(int numBalancerThreads) { log.warn("Balancer strategy 'cachingCost' is disabled. Using 'cost' strategy instead."); - return new CostBalancerStrategy(exec); + return new CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads)); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java index 9c404b97e55..3389f6732e1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java @@ -19,13 +19,11 @@ package org.apache.druid.server.coordinator.balancer; -import com.google.common.util.concurrent.ListeningExecutorService; - -public class DiskNormalizedCostBalancerStrategyFactory implements BalancerStrategyFactory +public class DiskNormalizedCostBalancerStrategyFactory extends BalancerStrategyFactory { @Override - public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) + public BalancerStrategy createBalancerStrategy(int numBalancerThreads) { - return new DiskNormalizedCostBalancerStrategy(exec); + return new DiskNormalizedCostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads)); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java index 2655df53380..6b97dee713d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java @@ -19,12 +19,10 @@ package org.apache.druid.server.coordinator.balancer; -import com.google.common.util.concurrent.ListeningExecutorService; - -public class RandomBalancerStrategyFactory implements BalancerStrategyFactory +public class RandomBalancerStrategyFactory extends BalancerStrategyFactory { @Override - public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) + public BalancerStrategy createBalancerStrategy(int numBalancerThreads) { return new RandomBalancerStrategy(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java index 8c5acaeebb6..da2f1e1a04a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java @@ -20,18 +20,18 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.util.concurrent.AtomicDouble; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.DruidCluster; -import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.loading.LoadQueuePeon; +import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; -import org.apache.druid.timeline.DataSegment; import java.util.Map; import java.util.Set; @@ -45,11 +45,11 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty { private static final Logger log = new Logger(CollectSegmentAndServerStats.class); - private final DruidCoordinator coordinator; + private final LoadQueueTaskMaster taskMaster; - public CollectSegmentAndServerStats(DruidCoordinator coordinator) + public CollectSegmentAndServerStats(LoadQueueTaskMaster taskMaster) { - this.coordinator = coordinator; + this.taskMaster = taskMaster; } @Override @@ -57,25 +57,15 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty { params.getDruidCluster().getHistoricals() .forEach(this::logHistoricalTierStats); - collectSegmentStats(params); + logServerDebuggingInfo(params.getDruidCluster()); + collectLoadQueueStats(params.getCoordinatorStats()); return params; } - private void collectSegmentStats(DruidCoordinatorRuntimeParams params) + private void collectLoadQueueStats(CoordinatorRunStats stats) { - final CoordinatorRunStats stats = params.getCoordinatorStats(); - - final DruidCluster cluster = params.getDruidCluster(); - cluster.getHistoricals().forEach((tier, historicals) -> { - final RowKey rowKey = RowKey.of(Dimension.TIER, tier); - stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size()); - long totalCapacity = historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum); - stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity); - }); - - // Collect load queue stats - coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> { + taskMaster.getAllPeons().forEach((serverName, queuePeon) -> { final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName); stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad()); stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size()); @@ -86,33 +76,6 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty stats.add(stat, createRowKeyForServer(serverName, key.getValues()), statValue) ); }); - - coordinator.getDatasourceToUnavailableSegmentCount().forEach( - (dataSource, numUnavailable) -> stats.add( - Stats.Segments.UNAVAILABLE, - RowKey.of(Dimension.DATASOURCE, dataSource), - numUnavailable - ) - ); - - coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach( - (tier, countsPerDatasource) -> countsPerDatasource.forEach( - (dataSource, underReplicatedCount) -> - stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, tier, dataSource, underReplicatedCount) - ) - ); - - // Collect total segment stats - params.getUsedSegmentsTimelinesPerDataSource().forEach( - (dataSource, timeline) -> { - long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream() - .mapToLong(DataSegment::getSize).sum(); - - RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource); - stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments); - stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects()); - } - ); } private RowKey createRowKeyForServer(String serverName, Map dimensionValues) @@ -151,4 +114,19 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty ); } + private void logServerDebuggingInfo(DruidCluster cluster) + { + if (log.isDebugEnabled()) { + log.debug("Servers"); + for (ServerHolder serverHolder : cluster.getAllServers()) { + ImmutableDruidServer druidServer = serverHolder.getServer(); + log.debug(" %s", druidServer); + log.debug(" -- DataSources"); + for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { + log.debug(" %s", druidDataSource); + } + } + } + } + } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java new file mode 100644 index 00000000000..197adce91e1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.ServerInventoryView; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCluster; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.balancer.BalancerStrategy; +import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; +import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; +import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * This duty does the following: + *
    + *
  • Creates an immutable {@link DruidCluster} consisting of {@link ServerHolder}s + * which represent the current state of the servers in the cluster.
  • + *
  • Starts and stops load peons for new and disappeared servers respectively.
  • + *
  • Cancels in-progress loads on all decommissioning servers. This is done + * here to ensure that under-replicated segments are assigned to active servers + * in the {@link RunRules} duty after this.
  • + *
  • Initializes the {@link BalancerStrategy} for the run.
  • + *
+ */ +public class PrepareBalancerAndLoadQueues implements CoordinatorDuty +{ + private static final Logger log = new Logger(PrepareBalancerAndLoadQueues.class); + + private final LoadQueueTaskMaster taskMaster; + private final SegmentLoadQueueManager loadQueueManager; + private final ServerInventoryView serverInventoryView; + private final BalancerStrategyFactory balancerStrategyFactory; + + public PrepareBalancerAndLoadQueues( + LoadQueueTaskMaster taskMaster, + SegmentLoadQueueManager loadQueueManager, + BalancerStrategyFactory balancerStrategyFactory, + ServerInventoryView serverInventoryView + ) + { + this.taskMaster = taskMaster; + this.loadQueueManager = loadQueueManager; + this.balancerStrategyFactory = balancerStrategyFactory; + this.serverInventoryView = serverInventoryView; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + List currentServers = prepareCurrentServers(); + taskMaster.resetPeonsForNewServers(currentServers); + + final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); + final SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig(); + + final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); + cancelLoadsOnDecommissioningServers(cluster); + + final CoordinatorRunStats stats = params.getCoordinatorStats(); + collectHistoricalStats(cluster, stats); + collectUsedSegmentStats(params, stats); + + int numBalancerThreads = params.getCoordinatorDynamicConfig().getBalancerComputeThreads(); + final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(numBalancerThreads); + log.info( + "Using balancer strategy [%s] with [%d] threads.", + balancerStrategy.getClass().getSimpleName(), numBalancerThreads + ); + + return params.buildFromExisting() + .withDruidCluster(cluster) + .withBalancerStrategy(balancerStrategy) + .withSegmentAssignerUsing(loadQueueManager) + .build(); + } + + /** + * Cancels all load/move operations on decommissioning servers. This should + * be done before initializing the SegmentReplicantLookup so that + * under-replicated segments can be assigned in the current run itself. + */ + private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) + { + final AtomicInteger cancelledCount = new AtomicInteger(0); + final List decommissioningServers + = cluster.getAllServers().stream() + .filter(ServerHolder::isDecommissioning) + .collect(Collectors.toList()); + + for (ServerHolder server : decommissioningServers) { + server.getQueuedSegments().forEach( + (segment, action) -> { + // Cancel the operation if it is a type of load + if (action.isLoad() && server.cancelOperation(action, segment)) { + cancelledCount.incrementAndGet(); + } + } + ); + } + + if (cancelledCount.get() > 0) { + log.info( + "Cancelled [%d] load/move operations on [%d] decommissioning servers.", + cancelledCount.get(), decommissioningServers.size() + ); + } + } + + private List prepareCurrentServers() + { + return serverInventoryView + .getInventory() + .stream() + .filter(DruidServer::isSegmentReplicationOrBroadcastTarget) + .map(DruidServer::toImmutableDruidServer) + .collect(Collectors.toList()); + } + + private DruidCluster prepareCluster( + CoordinatorDynamicConfig dynamicConfig, + SegmentLoadingConfig segmentLoadingConfig, + List currentServers + ) + { + final Set decommissioningServers = dynamicConfig.getDecommissioningNodes(); + final DruidCluster.Builder cluster = DruidCluster.builder(); + for (ImmutableDruidServer server : currentServers) { + cluster.add( + new ServerHolder( + server, + taskMaster.getPeonForServer(server), + decommissioningServers.contains(server.getHost()), + segmentLoadingConfig.getMaxSegmentsInLoadQueue(), + segmentLoadingConfig.getMaxLifetimeInLoadQueue() + ) + ); + } + return cluster.build(); + } + + private void collectHistoricalStats(DruidCluster cluster, CoordinatorRunStats stats) + { + cluster.getHistoricals().forEach((tier, historicals) -> { + RowKey rowKey = RowKey.of(Dimension.TIER, tier); + stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size()); + + long totalCapacity = historicals.stream().mapToLong(ServerHolder::getMaxSize).sum(); + stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity); + }); + } + + private void collectUsedSegmentStats(DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats) + { + params.getUsedSegmentsTimelinesPerDataSource().forEach((dataSource, timeline) -> { + long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream() + .mapToLong(DataSegment::getSize).sum(); + + RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource); + stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments); + stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects()); + }); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java index bc68f87bc58..7a97a61fa31 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java @@ -20,22 +20,32 @@ package org.apache.druid.server.coordinator.loading; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Provider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.initialization.ZkPathsConfig; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; /** * Provides LoadQueuePeons */ public class LoadQueueTaskMaster { + private static final Logger log = new Logger(LoadQueueTaskMaster.class); + private final Provider curatorFrameworkProvider; private final ObjectMapper jsonMapper; private final ScheduledExecutorService peonExec; @@ -45,6 +55,11 @@ public class LoadQueueTaskMaster private final ZkPathsConfig zkPaths; private final boolean httpLoading; + @GuardedBy("this") + private final AtomicBoolean isLeader = new AtomicBoolean(false); + + private final ConcurrentHashMap loadManagementPeons = new ConcurrentHashMap<>(); + public LoadQueueTaskMaster( Provider curatorFrameworkProvider, ObjectMapper jsonMapper, @@ -65,7 +80,7 @@ public class LoadQueueTaskMaster this.httpLoading = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); } - public LoadQueuePeon giveMePeon(ImmutableDruidServer server) + private LoadQueuePeon createPeon(ImmutableDruidServer server) { if (httpLoading) { return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec); @@ -81,6 +96,69 @@ public class LoadQueueTaskMaster } } + public Map getAllPeons() + { + return loadManagementPeons; + } + + public LoadQueuePeon getPeonForServer(ImmutableDruidServer server) + { + return loadManagementPeons.get(server.getName()); + } + + /** + * Creates a peon for each of the given servers, if it doesn't already exist and + * removes peons for servers not present in the cluster anymore. + *

+ * This method must not run concurrently with {@link #onLeaderStart()} and + * {@link #onLeaderStop()} so that there are no stray peons if the Coordinator + * is not leader anymore. + */ + public synchronized void resetPeonsForNewServers(List currentServers) + { + if (!isLeader.get()) { + return; + } + + final Set oldServers = Sets.newHashSet(loadManagementPeons.keySet()); + + // Start peons for new servers + for (ImmutableDruidServer server : currentServers) { + loadManagementPeons.computeIfAbsent(server.getName(), serverName -> { + LoadQueuePeon loadQueuePeon = createPeon(server); + loadQueuePeon.start(); + log.debug("Created LoadQueuePeon for server[%s].", server.getName()); + return loadQueuePeon; + }); + } + + // Remove peons for disappeared servers + for (ImmutableDruidServer server : currentServers) { + oldServers.remove(server.getName()); + } + for (String name : oldServers) { + log.debug("Removing LoadQueuePeon for disappeared server[%s].", name); + LoadQueuePeon peon = loadManagementPeons.remove(name); + peon.stop(); + } + } + + public synchronized void onLeaderStart() + { + isLeader.set(true); + } + + /** + * Stops and removes all peons. + */ + public synchronized void onLeaderStop() + { + isLeader.set(false); + + loadManagementPeons.values().forEach(LoadQueuePeon::stop); + loadManagementPeons.clear(); + } + public boolean isHttpLoading() { return httpLoading; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 249dea2ce99..d1e5b7c5b24 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.ListeningExecutorService; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.curator.framework.CuratorFramework; @@ -51,7 +50,6 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; -import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory; import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty; @@ -583,60 +581,6 @@ public class DruidCoordinatorTest extends CuratorTestBase EasyMock.verify(metadataRuleManager); } - @Test - public void testBalancerThreadNumber() - { - ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); - EasyMock.replay(scheduledExecutorFactory); - - DruidCoordinator c = new DruidCoordinator( - druidCoordinatorConfig, - EasyMock.createNiceMock(JacksonConfigManager.class), - null, - null, - null, - null, - scheduledExecutorFactory, - null, - loadQueueTaskMaster, - null, - null, - null, - null, - null, - new CoordinatorCustomDutyGroups(ImmutableSet.of()), - new RandomBalancerStrategyFactory(), - null, - null, - null - ); - - // before initialization - Assert.assertEquals(0, c.getCachedBalancerThreadNumber()); - Assert.assertNull(c.getBalancerExec()); - - // first initialization - c.createBalancerStrategy(5); - Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); - ListeningExecutorService firstExec = c.getBalancerExec(); - Assert.assertNotNull(firstExec); - - // second initialization, expect no changes as cachedBalancerThreadNumber is not changed - c.createBalancerStrategy(5); - Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); - ListeningExecutorService secondExec = c.getBalancerExec(); - Assert.assertNotNull(secondExec); - Assert.assertSame(firstExec, secondExec); - - // third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10 - c.createBalancerStrategy(10); - Assert.assertEquals(10, c.getCachedBalancerThreadNumber()); - ListeningExecutorService thirdExec = c.getBalancerExec(); - Assert.assertNotNull(thirdExec); - Assert.assertNotSame(secondExec, thirdExec); - Assert.assertNotSame(firstExec, thirdExec); - } - @Test public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty() { @@ -767,6 +711,7 @@ public class DruidCoordinatorTest extends CuratorTestBase public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception { // Some nessesary setup to start the Coordinator + setupPeons(Collections.emptyMap()); JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); EasyMock.expect( configManager.watch( @@ -805,10 +750,9 @@ public class DruidCoordinatorTest extends CuratorTestBase EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes(); EasyMock.expect(segmentsMetadataManager.iterateAllUsedSegments()) .andReturn(Collections.singletonList(dataSegment)).anyTimes(); - EasyMock.replay(segmentsMetadataManager); EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes(); - EasyMock.replay(serverInventoryView); + EasyMock.replay(serverInventoryView, loadQueueTaskMaster, segmentsMetadataManager); // Create CoordinatorCustomDutyGroups // We will have two groups and each group has one duty @@ -942,7 +886,16 @@ public class DruidCoordinatorTest extends CuratorTestBase private void setupPeons(Map peonMap) { - EasyMock.expect(loadQueueTaskMaster.giveMePeon(EasyMock.anyObject())).andAnswer( + loadQueueTaskMaster.resetPeonsForNewServers(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + loadQueueTaskMaster.onLeaderStart(); + EasyMock.expectLastCall().anyTimes(); + loadQueueTaskMaster.onLeaderStop(); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(loadQueueTaskMaster.getAllPeons()).andReturn(peonMap).anyTimes(); + + EasyMock.expect(loadQueueTaskMaster.getPeonForServer(EasyMock.anyObject())).andAnswer( () -> peonMap.get(((ImmutableDruidServer) EasyMock.getCurrentArgument(0)).getName()) ).anyTimes(); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java index d08b8ff104e..f8234b36dae 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java @@ -22,42 +22,45 @@ package org.apache.druid.server.coordinator.balancer; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.segment.TestHelper; -import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; public class BalancerStrategyFactoryTest { private final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); - private ListeningExecutorService executorService; - - @Before - public void setup() - { - executorService = MoreExecutors.listeningDecorator( - new BlockingExecutorService("StrategyFactoryTest-%s") - ); - } - - @After - public void tearDown() - { - executorService.shutdownNow(); - } - @Test public void testCachingCostStrategyFallsBackToCost() throws JsonProcessingException { final String json = "{\"strategy\":\"cachingCost\"}"; BalancerStrategyFactory factory = MAPPER.readValue(json, BalancerStrategyFactory.class); - BalancerStrategy strategy = factory.createBalancerStrategy(executorService); + BalancerStrategy strategy = factory.createBalancerStrategy(1); Assert.assertTrue(strategy instanceof CostBalancerStrategy); Assert.assertFalse(strategy instanceof CachingCostBalancerStrategy); + + factory.stopExecutor(); + } + + @Test + public void testBalancerFactoryCreatesNewExecutorIfNumThreadsChanges() + { + BalancerStrategyFactory factory = new CostBalancerStrategyFactory(); + ListeningExecutorService exec1 = factory.getOrCreateBalancerExecutor(1); + ListeningExecutorService exec2 = factory.getOrCreateBalancerExecutor(2); + + Assert.assertTrue(exec1.isShutdown()); + Assert.assertNotSame(exec1, exec2); + + ListeningExecutorService exec3 = factory.getOrCreateBalancerExecutor(3); + Assert.assertTrue(exec2.isShutdown()); + Assert.assertNotSame(exec2, exec3); + + ListeningExecutorService exec4 = factory.getOrCreateBalancerExecutor(3); + Assert.assertFalse(exec3.isShutdown()); + Assert.assertSame(exec3, exec4); + + factory.stopExecutor(); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java index aada7b3214b..1f51565bcd5 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java @@ -30,7 +30,7 @@ import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; -import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; +import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; @@ -92,7 +92,7 @@ public class BalanceSegmentsTest server4 = new DruidServer("server4", "server4", null, 100L, ServerType.HISTORICAL, "normal", 0); balancerStrategyExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalanceSegmentsTest-%d")); - balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor); + balancerStrategy = new CostBalancerStrategy(balancerStrategyExecutor); broadcastDatasources = Collections.singleton("datasourceBroadcast"); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java index 1090ef573e4..9921281fde8 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java @@ -19,14 +19,14 @@ package org.apache.druid.server.coordinator.duty; -import it.unimi.dsi.fastutil.objects.Object2IntMaps; -import it.unimi.dsi.fastutil.objects.Object2LongMaps; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.server.coordinator.DruidCluster; -import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy; +import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Stats; import org.junit.Assert; @@ -36,13 +36,11 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import java.util.Collections; - @RunWith(MockitoJUnitRunner.class) public class CollectSegmentAndServerStatsTest { @Mock - private DruidCoordinator mockDruidCoordinator; + private LoadQueueTaskMaster mockTaskMaster; @Test public void testCollectedSegmentStats() @@ -55,17 +53,15 @@ public class CollectSegmentAndServerStatsTest .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null)) .build(); - Mockito.when(mockDruidCoordinator.getDatasourceToUnavailableSegmentCount()) - .thenReturn(Object2IntMaps.singleton("ds", 10)); - Mockito.when(mockDruidCoordinator.getTierToDatasourceToUnderReplicatedCount(false)) - .thenReturn(Collections.singletonMap("ds", Object2LongMaps.singleton("tier1", 100))); + Mockito.when(mockTaskMaster.getAllPeons()) + .thenReturn(ImmutableMap.of("server1", new TestLoadQueuePeon())); - CoordinatorDuty duty = new CollectSegmentAndServerStats(mockDruidCoordinator); + CoordinatorDuty duty = new CollectSegmentAndServerStats(mockTaskMaster); DruidCoordinatorRuntimeParams params = duty.run(runtimeParams); CoordinatorRunStats stats = params.getCoordinatorStats(); - Assert.assertTrue(stats.hasStat(Stats.Segments.UNAVAILABLE)); - Assert.assertTrue(stats.hasStat(Stats.Segments.UNDER_REPLICATED)); + Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_LOAD)); + Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_DROP)); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 6884d259754..2926dbd6d70 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -1774,7 +1774,7 @@ public class CompactSegmentsTest { DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc()) - .withSnapshotOfDataSourcesWithAllUsedSegments(dataSources) + .withDataSourcesSnapshot(dataSources) .withCompactionConfig( new CoordinatorCompactionConfig( compactionConfigs, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java index ce89cd616bd..acbf89e3225 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java @@ -91,7 +91,7 @@ public class MarkOvershadowedSegmentsAsUnusedTest DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams .newBuilder(DateTimes.nowUtc()) - .withSnapshotOfDataSourcesWithAllUsedSegments( + .withDataSourcesSnapshot( segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments() ) .withDruidCluster(druidCluster)