Refactor: Move `UpdateCoordinatorStateAndPrepareCluster` duty out of `DruidCoordinator` (#14845)

Motivation:
- Clean up `DruidCoordinator` and move methods to classes where they are most relevant

Changes:
- No functional change
- Add duty `PrepareBalancerAndLoadQueues` to replace `UpdateCoordinatorState`
- Move map of `LoadQueuePeon` from `DruidCoordinator` to `LoadQueueTaskMaster`
- Make `BalancerStrategyFactory` an abstract class and keep the balancer executor here
- Move reporting of used segment stats and historical capacity stats from
`CollectSegmentAndServerStats` to `PrepareBalancerAndLoadQueues`
- Move reporting of unavailable and under-replicated segment stats from
`CollectSegmentAndServerStats` to `UpdateReplicationStatus` duty
This commit is contained in:
Kashif Faraz 2023-08-22 19:50:41 +05:30 committed by GitHub
parent 14c1aff150
commit 9376d8d6e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 437 additions and 389 deletions

View File

@ -23,8 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject; import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntMap;
@ -35,7 +33,6 @@ import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer; import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.common.config.JacksonConfigManager;
@ -47,7 +44,6 @@ import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty
import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Comparators;
@ -60,7 +56,6 @@ import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy; import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.BalanceSegments; import org.apache.druid.server.coordinator.duty.BalanceSegments;
@ -70,12 +65,12 @@ import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty; import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused; import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
import org.apache.druid.server.coordinator.duty.PrepareBalancerAndLoadQueues;
import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.SegmentReplicaCount; import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus; import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@ -98,7 +93,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -144,7 +138,6 @@ public class DruidCoordinator
private final ScheduledExecutorFactory executorFactory; private final ScheduledExecutorFactory executorFactory;
private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new HashMap<>(); private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new HashMap<>();
private final LoadQueueTaskMaster taskMaster; private final LoadQueueTaskMaster taskMaster;
private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons = new ConcurrentHashMap<>();
private final SegmentLoadQueueManager loadQueueManager; private final SegmentLoadQueueManager loadQueueManager;
private final ServiceAnnouncer serviceAnnouncer; private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self; private final DruidNode self;
@ -171,9 +164,6 @@ public class DruidCoordinator
*/ */
private volatile SegmentReplicationStatus segmentReplicationStatus = null; private volatile SegmentReplicationStatus segmentReplicationStatus = null;
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;
public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties"; public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties"; private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties"; private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
@ -233,7 +223,7 @@ public class DruidCoordinator
public Map<String, LoadQueuePeon> getLoadManagementPeons() public Map<String, LoadQueuePeon> getLoadManagementPeons()
{ {
return loadManagementPeons; return taskMaster.getAllPeons();
} }
public Map<String, Object2LongMap<String>> getTierToDatasourceToUnderReplicatedCount(boolean useClusterView) public Map<String, Object2LongMap<String>> getTierToDatasourceToUnderReplicatedCount(boolean useClusterView)
@ -344,18 +334,6 @@ public class DruidCoordinator
return coordLeaderSelector.getCurrentLeader(); return coordLeaderSelector.getCurrentLeader();
} }
@VisibleForTesting
public int getCachedBalancerThreadNumber()
{
return cachedBalancerThreadNumber;
}
@VisibleForTesting
public ListeningExecutorService getBalancerExec()
{
return balancerExec;
}
@LifecycleStart @LifecycleStart
public void start() public void start()
{ {
@ -397,10 +375,7 @@ public class DruidCoordinator
started = false; started = false;
stopAllDutyGroupExecutors(); stopAllDutyGroupExecutors();
balancerStrategyFactory.stopExecutor();
if (balancerExec != null) {
balancerExec.shutdownNow();
}
} }
} }
@ -440,6 +415,7 @@ public class DruidCoordinator
config.getCoordinatorStartDelay() config.getCoordinatorStartDelay()
); );
taskMaster.onLeaderStart();
segmentsMetadataManager.startPollingDatabasePeriodically(); segmentsMetadataManager.startPollingDatabasePeriodically();
segmentsMetadataManager.populateUsedFlagLastUpdatedAsync(); segmentsMetadataManager.populateUsedFlagLastUpdatedAsync();
metadataRuleManager.start(); metadataRuleManager.start();
@ -524,22 +500,13 @@ public class DruidCoordinator
log.info("I am no longer the leader..."); log.info("I am no longer the leader...");
for (String server : loadManagementPeons.keySet()) { taskMaster.onLeaderStop();
LoadQueuePeon peon = loadManagementPeons.remove(server);
peon.stop();
}
loadManagementPeons.clear();
serviceAnnouncer.unannounce(self); serviceAnnouncer.unannounce(self);
lookupCoordinatorManager.stop(); lookupCoordinatorManager.stop();
metadataRuleManager.stop(); metadataRuleManager.stop();
segmentsMetadataManager.stopPollingDatabasePeriodically(); segmentsMetadataManager.stopPollingDatabasePeriodically();
segmentsMetadataManager.stopAsyncUsedFlagLastUpdatedUpdate(); segmentsMetadataManager.stopAsyncUsedFlagLastUpdatedUpdate();
balancerStrategyFactory.stopExecutor();
if (balancerExec != null) {
balancerExec.shutdownNow();
balancerExec = null;
}
} }
} }
@ -559,53 +526,21 @@ public class DruidCoordinator
dutyGroupExecutors.clear(); dutyGroupExecutors.clear();
} }
/**
* Resets the balancerExec if required and creates a new BalancerStrategy for
* the current coordinator run.
*/
@VisibleForTesting
BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
{
// Reset balancerExecutor if required
if (balancerExec == null) {
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
} else if (cachedBalancerThreadNumber != balancerComputeThreads) {
log.info(
"'balancerComputeThreads' has changed from [%d] to [%d]",
cachedBalancerThreadNumber, balancerComputeThreads
);
balancerExec.shutdownNow();
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
}
// Create BalancerStrategy
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy[%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
);
return balancerStrategy;
}
private ListeningExecutorService createNewBalancerExecutor(int numThreads)
{
log.info("Creating new balancer executor with [%d] threads.", numThreads);
cachedBalancerThreadNumber = numThreads;
return MoreExecutors.listeningDecorator(
Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
);
}
private List<CoordinatorDuty> makeHistoricalManagementDuties() private List<CoordinatorDuty> makeHistoricalManagementDuties()
{ {
return ImmutableList.of( return ImmutableList.of(
new UpdateCoordinatorStateAndPrepareCluster(), new PrepareBalancerAndLoadQueues(
taskMaster,
loadQueueManager,
balancerStrategyFactory,
serverInventoryView
),
new RunRules(segmentsMetadataManager::markSegmentsAsUnused), new RunRules(segmentsMetadataManager::markSegmentsAsUnused),
new UpdateReplicationStatus(), new UpdateReplicationStatus(),
new UnloadUnusedSegments(loadQueueManager), new UnloadUnusedSegments(loadQueueManager),
new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused), new MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused),
new BalanceSegments(config.getCoordinatorPeriod()), new BalanceSegments(config.getCoordinatorPeriod()),
new CollectSegmentAndServerStats(DruidCoordinator.this) new CollectSegmentAndServerStats(taskMaster)
); );
} }
@ -724,7 +659,7 @@ public class DruidCoordinator
DruidCoordinatorRuntimeParams DruidCoordinatorRuntimeParams
.newBuilder(coordinatorStartTime) .newBuilder(coordinatorStartTime)
.withDatabaseRuleManager(metadataRuleManager) .withDatabaseRuleManager(metadataRuleManager)
.withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot) .withDataSourcesSnapshot(dataSourcesSnapshot)
.withDynamicConfigs(getDynamicConfigs()) .withDynamicConfigs(getDynamicConfigs())
.withCompactionConfig(getCompactionConfig()) .withCompactionConfig(getCompactionConfig())
.build(); .build();
@ -816,149 +751,6 @@ public class DruidCoordinator
} }
} }
/**
* This duty does the following:
* <ul>
* <li>Prepares an immutable {@link DruidCluster} consisting of {@link ServerHolder}s
* which represent the current state of the servers in the cluster.</li>
* <li>Starts and stops load peons for new and disappeared servers respectively.</li>
* <li>Cancels in-progress loads on all decommissioning servers. This is done
* here to ensure that under-replicated segments are assigned to active servers
* in the {@link RunRules} duty after this.</li>
* <li>Initializes the {@link BalancerStrategy} for the run.</li>
* </ul>
*
* @see #makeHistoricalManagementDuties() for the order of duties
*/
private class UpdateCoordinatorStateAndPrepareCluster implements CoordinatorDuty
{
@Nullable
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
List<ImmutableDruidServer> currentServers = prepareCurrentServers();
startPeonsForNewServers(currentServers);
stopPeonsForDisappearedServers(currentServers);
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig();
final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);
final BalancerStrategy balancerStrategy
= createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)
.withSegmentAssignerUsing(loadQueueManager)
.build();
}
/**
* Cancels all load/move operations on decommissioning servers. This should
* be done before initializing the SegmentReplicantLookup so that
* under-replicated segments can be assigned in the current run itself.
*/
private void cancelLoadsOnDecommissioningServers(DruidCluster cluster)
{
final AtomicInteger cancelledCount = new AtomicInteger(0);
final List<ServerHolder> decommissioningServers
= cluster.getAllServers().stream()
.filter(ServerHolder::isDecommissioning)
.collect(Collectors.toList());
for (ServerHolder server : decommissioningServers) {
server.getQueuedSegments().forEach(
(segment, action) -> {
// Cancel the operation if it is a type of load
if (action.isLoad() && server.cancelOperation(action, segment)) {
cancelledCount.incrementAndGet();
}
}
);
}
if (cancelledCount.get() > 0) {
log.info(
"Cancelled [%d] load/move operations on [%d] decommissioning servers.",
cancelledCount.get(), decommissioningServers.size()
);
}
}
List<ImmutableDruidServer> prepareCurrentServers()
{
List<ImmutableDruidServer> currentServers = serverInventoryView
.getInventory()
.stream()
.filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());
if (log.isDebugEnabled()) {
// Display info about all segment-replicatable (historical and bridge) servers
log.debug("Servers");
for (ImmutableDruidServer druidServer : currentServers) {
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
}
}
return currentServers;
}
void startPeonsForNewServers(List<ImmutableDruidServer> currentServers)
{
for (ImmutableDruidServer server : currentServers) {
loadManagementPeons.computeIfAbsent(server.getName(), serverName -> {
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
loadQueuePeon.start();
log.debug("Created LoadQueuePeon for server[%s].", server.getName());
return loadQueuePeon;
});
}
}
DruidCluster prepareCluster(
CoordinatorDynamicConfig dynamicConfig,
SegmentLoadingConfig segmentLoadingConfig,
List<ImmutableDruidServer> currentServers
)
{
final Set<String> decommissioningServers = dynamicConfig.getDecommissioningNodes();
final DruidCluster.Builder cluster = DruidCluster.builder();
for (ImmutableDruidServer server : currentServers) {
cluster.add(
new ServerHolder(
server,
loadManagementPeons.get(server.getName()),
decommissioningServers.contains(server.getHost()),
segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
segmentLoadingConfig.getMaxLifetimeInLoadQueue()
)
);
}
return cluster.build();
}
void stopPeonsForDisappearedServers(List<ImmutableDruidServer> servers)
{
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (ImmutableDruidServer server : servers) {
disappeared.remove(server.getName());
}
for (String name : disappeared) {
log.debug("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();
}
}
}
/** /**
* Updates replication status of all used segments. This duty must run after * Updates replication status of all used segments. This duty must run after
* {@link RunRules} so that the number of required replicas for all segments * {@link RunRules} so that the number of required replicas for all segments
@ -971,6 +763,23 @@ public class DruidCoordinator
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{ {
segmentReplicationStatus = params.getSegmentReplicationStatus(); segmentReplicationStatus = params.getSegmentReplicationStatus();
// Collect stats for unavailable and under-replicated segments
final CoordinatorRunStats stats = params.getCoordinatorStats();
getDatasourceToUnavailableSegmentCount().forEach(
(dataSource, numUnavailable) -> stats.add(
Stats.Segments.UNAVAILABLE,
RowKey.of(Dimension.DATASOURCE, dataSource),
numUnavailable
)
);
getTierToDatasourceToUnderReplicatedCount(false).forEach(
(tier, countsPerDatasource) -> countsPerDatasource.forEach(
(dataSource, underReplicatedCount) ->
stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, tier, dataSource, underReplicatedCount)
)
);
return params; return params;
} }

View File

@ -332,7 +332,7 @@ public class DruidCoordinatorRuntimeParams
return this; return this;
} }
public Builder withSnapshotOfDataSourcesWithAllUsedSegments(DataSourcesSnapshot snapshot) public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot)
{ {
this.usedSegments = createUsedSegmentsSet(snapshot.iterateAllUsedSegmentsInSnapshot()); this.usedSegments = createUsedSegmentsSet(snapshot.iterateAllUsedSegmentsInSnapshot());
this.dataSourcesSnapshot = snapshot; this.dataSourcesSnapshot = snapshot;

View File

@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator.balancer;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@ -30,7 +33,45 @@ import com.google.common.util.concurrent.ListeningExecutorService;
@JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class), @JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class) @JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class)
}) })
public interface BalancerStrategyFactory public abstract class BalancerStrategyFactory
{ {
BalancerStrategy createBalancerStrategy(ListeningExecutorService exec); private static final Logger log = new Logger(BalancerStrategyFactory.class);
public abstract BalancerStrategy createBalancerStrategy(int numBalancerThreads);
private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;
public void stopExecutor()
{
if (balancerExec != null) {
balancerExec.shutdownNow();
balancerExec = null;
}
}
protected ListeningExecutorService getOrCreateBalancerExecutor(int balancerComputeThreads)
{
if (balancerExec == null) {
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
} else if (cachedBalancerThreadNumber != balancerComputeThreads) {
log.info(
"'balancerComputeThreads' has changed from [%d] to [%d].",
cachedBalancerThreadNumber, balancerComputeThreads
);
balancerExec.shutdownNow();
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
}
return balancerExec;
}
private ListeningExecutorService createNewBalancerExecutor(int numThreads)
{
log.info("Creating new balancer executor with [%d] threads.", numThreads);
cachedBalancerThreadNumber = numThreads;
return MoreExecutors.listeningDecorator(
Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
);
}
} }

View File

@ -44,7 +44,7 @@ import java.util.concurrent.RejectedExecutionException;
* and will be removed in future releases. * and will be removed in future releases.
*/ */
@Deprecated @Deprecated
public class CachingCostBalancerStrategyFactory implements BalancerStrategyFactory public class CachingCostBalancerStrategyFactory extends BalancerStrategyFactory
{ {
private static final EmittingLogger LOG = new EmittingLogger(CachingCostBalancerStrategyFactory.class); private static final EmittingLogger LOG = new EmittingLogger(CachingCostBalancerStrategyFactory.class);
@ -128,8 +128,9 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
} }
@Override @Override
public BalancerStrategy createBalancerStrategy(final ListeningExecutorService exec) public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{ {
final ListeningExecutorService exec = getOrCreateBalancerExecutor(numBalancerThreads);
LOG.warn( LOG.warn(
"'cachingCost' balancer strategy has been deprecated as it can lead to" "'cachingCost' balancer strategy has been deprecated as it can lead to"
+ " unbalanced clusters. Use 'cost' strategy instead." + " unbalanced clusters. Use 'cost' strategy instead."

View File

@ -19,13 +19,11 @@
package org.apache.druid.server.coordinator.balancer; package org.apache.druid.server.coordinator.balancer;
import com.google.common.util.concurrent.ListeningExecutorService; public class CostBalancerStrategyFactory extends BalancerStrategyFactory
public class CostBalancerStrategyFactory implements BalancerStrategyFactory
{ {
@Override @Override
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) public CostBalancerStrategy createBalancerStrategy(int numBalancerThreads)
{ {
return new CostBalancerStrategy(exec); return new CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
} }
} }

View File

@ -19,17 +19,16 @@
package org.apache.druid.server.coordinator.balancer; package org.apache.druid.server.coordinator.balancer;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
public class DisabledCachingCostBalancerStrategyFactory implements BalancerStrategyFactory public class DisabledCachingCostBalancerStrategyFactory extends BalancerStrategyFactory
{ {
private static final Logger log = new Logger(BalancerStrategyFactory.class); private static final Logger log = new Logger(BalancerStrategyFactory.class);
@Override @Override
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{ {
log.warn("Balancer strategy 'cachingCost' is disabled. Using 'cost' strategy instead."); log.warn("Balancer strategy 'cachingCost' is disabled. Using 'cost' strategy instead.");
return new CostBalancerStrategy(exec); return new CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
} }
} }

View File

@ -19,13 +19,11 @@
package org.apache.druid.server.coordinator.balancer; package org.apache.druid.server.coordinator.balancer;
import com.google.common.util.concurrent.ListeningExecutorService; public class DiskNormalizedCostBalancerStrategyFactory extends BalancerStrategyFactory
public class DiskNormalizedCostBalancerStrategyFactory implements BalancerStrategyFactory
{ {
@Override @Override
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{ {
return new DiskNormalizedCostBalancerStrategy(exec); return new DiskNormalizedCostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
} }
} }

View File

@ -19,12 +19,10 @@
package org.apache.druid.server.coordinator.balancer; package org.apache.druid.server.coordinator.balancer;
import com.google.common.util.concurrent.ListeningExecutorService; public class RandomBalancerStrategyFactory extends BalancerStrategyFactory
public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
{ {
@Override @Override
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
{ {
return new RandomBalancerStrategy(); return new RandomBalancerStrategy();
} }

View File

@ -20,18 +20,18 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.AtomicDouble;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -45,11 +45,11 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
{ {
private static final Logger log = new Logger(CollectSegmentAndServerStats.class); private static final Logger log = new Logger(CollectSegmentAndServerStats.class);
private final DruidCoordinator coordinator; private final LoadQueueTaskMaster taskMaster;
public CollectSegmentAndServerStats(DruidCoordinator coordinator) public CollectSegmentAndServerStats(LoadQueueTaskMaster taskMaster)
{ {
this.coordinator = coordinator; this.taskMaster = taskMaster;
} }
@Override @Override
@ -57,25 +57,15 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
{ {
params.getDruidCluster().getHistoricals() params.getDruidCluster().getHistoricals()
.forEach(this::logHistoricalTierStats); .forEach(this::logHistoricalTierStats);
collectSegmentStats(params); logServerDebuggingInfo(params.getDruidCluster());
collectLoadQueueStats(params.getCoordinatorStats());
return params; return params;
} }
private void collectSegmentStats(DruidCoordinatorRuntimeParams params) private void collectLoadQueueStats(CoordinatorRunStats stats)
{ {
final CoordinatorRunStats stats = params.getCoordinatorStats(); taskMaster.getAllPeons().forEach((serverName, queuePeon) -> {
final DruidCluster cluster = params.getDruidCluster();
cluster.getHistoricals().forEach((tier, historicals) -> {
final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
long totalCapacity = historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum);
stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
});
// Collect load queue stats
coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> {
final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName); final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad()); stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size()); stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size());
@ -86,33 +76,6 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
stats.add(stat, createRowKeyForServer(serverName, key.getValues()), statValue) stats.add(stat, createRowKeyForServer(serverName, key.getValues()), statValue)
); );
}); });
coordinator.getDatasourceToUnavailableSegmentCount().forEach(
(dataSource, numUnavailable) -> stats.add(
Stats.Segments.UNAVAILABLE,
RowKey.of(Dimension.DATASOURCE, dataSource),
numUnavailable
)
);
coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach(
(tier, countsPerDatasource) -> countsPerDatasource.forEach(
(dataSource, underReplicatedCount) ->
stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, tier, dataSource, underReplicatedCount)
)
);
// Collect total segment stats
params.getUsedSegmentsTimelinesPerDataSource().forEach(
(dataSource, timeline) -> {
long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
.mapToLong(DataSegment::getSize).sum();
RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments);
stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects());
}
);
} }
private RowKey createRowKeyForServer(String serverName, Map<Dimension, String> dimensionValues) private RowKey createRowKeyForServer(String serverName, Map<Dimension, String> dimensionValues)
@ -151,4 +114,19 @@ public class CollectSegmentAndServerStats implements CoordinatorDuty
); );
} }
private void logServerDebuggingInfo(DruidCluster cluster)
{
if (log.isDebugEnabled()) {
log.debug("Servers");
for (ServerHolder serverHolder : cluster.getAllServers()) {
ImmutableDruidServer druidServer = serverHolder.getServer();
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
}
}
}
} }

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* This duty does the following:
* <ul>
* <li>Creates an immutable {@link DruidCluster} consisting of {@link ServerHolder}s
* which represent the current state of the servers in the cluster.</li>
* <li>Starts and stops load peons for new and disappeared servers respectively.</li>
* <li>Cancels in-progress loads on all decommissioning servers. This is done
* here to ensure that under-replicated segments are assigned to active servers
* in the {@link RunRules} duty after this.</li>
* <li>Initializes the {@link BalancerStrategy} for the run.</li>
* </ul>
*/
public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
{
private static final Logger log = new Logger(PrepareBalancerAndLoadQueues.class);
private final LoadQueueTaskMaster taskMaster;
private final SegmentLoadQueueManager loadQueueManager;
private final ServerInventoryView serverInventoryView;
private final BalancerStrategyFactory balancerStrategyFactory;
public PrepareBalancerAndLoadQueues(
LoadQueueTaskMaster taskMaster,
SegmentLoadQueueManager loadQueueManager,
BalancerStrategyFactory balancerStrategyFactory,
ServerInventoryView serverInventoryView
)
{
this.taskMaster = taskMaster;
this.loadQueueManager = loadQueueManager;
this.balancerStrategyFactory = balancerStrategyFactory;
this.serverInventoryView = serverInventoryView;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
List<ImmutableDruidServer> currentServers = prepareCurrentServers();
taskMaster.resetPeonsForNewServers(currentServers);
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final SegmentLoadingConfig segmentLoadingConfig = params.getSegmentLoadingConfig();
final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);
final CoordinatorRunStats stats = params.getCoordinatorStats();
collectHistoricalStats(cluster, stats);
collectUsedSegmentStats(params, stats);
int numBalancerThreads = params.getCoordinatorDynamicConfig().getBalancerComputeThreads();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(numBalancerThreads);
log.info(
"Using balancer strategy [%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), numBalancerThreads
);
return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)
.withSegmentAssignerUsing(loadQueueManager)
.build();
}
/**
* Cancels all load/move operations on decommissioning servers. This should
* be done before initializing the SegmentReplicantLookup so that
* under-replicated segments can be assigned in the current run itself.
*/
private void cancelLoadsOnDecommissioningServers(DruidCluster cluster)
{
final AtomicInteger cancelledCount = new AtomicInteger(0);
final List<ServerHolder> decommissioningServers
= cluster.getAllServers().stream()
.filter(ServerHolder::isDecommissioning)
.collect(Collectors.toList());
for (ServerHolder server : decommissioningServers) {
server.getQueuedSegments().forEach(
(segment, action) -> {
// Cancel the operation if it is a type of load
if (action.isLoad() && server.cancelOperation(action, segment)) {
cancelledCount.incrementAndGet();
}
}
);
}
if (cancelledCount.get() > 0) {
log.info(
"Cancelled [%d] load/move operations on [%d] decommissioning servers.",
cancelledCount.get(), decommissioningServers.size()
);
}
}
private List<ImmutableDruidServer> prepareCurrentServers()
{
return serverInventoryView
.getInventory()
.stream()
.filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());
}
private DruidCluster prepareCluster(
CoordinatorDynamicConfig dynamicConfig,
SegmentLoadingConfig segmentLoadingConfig,
List<ImmutableDruidServer> currentServers
)
{
final Set<String> decommissioningServers = dynamicConfig.getDecommissioningNodes();
final DruidCluster.Builder cluster = DruidCluster.builder();
for (ImmutableDruidServer server : currentServers) {
cluster.add(
new ServerHolder(
server,
taskMaster.getPeonForServer(server),
decommissioningServers.contains(server.getHost()),
segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
segmentLoadingConfig.getMaxLifetimeInLoadQueue()
)
);
}
return cluster.build();
}
private void collectHistoricalStats(DruidCluster cluster, CoordinatorRunStats stats)
{
cluster.getHistoricals().forEach((tier, historicals) -> {
RowKey rowKey = RowKey.of(Dimension.TIER, tier);
stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
long totalCapacity = historicals.stream().mapToLong(ServerHolder::getMaxSize).sum();
stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
});
}
private void collectUsedSegmentStats(DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats)
{
params.getUsedSegmentsTimelinesPerDataSource().forEach((dataSource, timeline) -> {
long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
.mapToLong(DataSegment::getSize).sum();
RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments);
stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects());
});
}
}

View File

@ -20,22 +20,32 @@
package org.apache.druid.server.coordinator.loading; package org.apache.druid.server.coordinator.loading;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Provider; import com.google.inject.Provider;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.initialization.ZkPathsConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Provides LoadQueuePeons * Provides LoadQueuePeons
*/ */
public class LoadQueueTaskMaster public class LoadQueueTaskMaster
{ {
private static final Logger log = new Logger(LoadQueueTaskMaster.class);
private final Provider<CuratorFramework> curatorFrameworkProvider; private final Provider<CuratorFramework> curatorFrameworkProvider;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ScheduledExecutorService peonExec; private final ScheduledExecutorService peonExec;
@ -45,6 +55,11 @@ public class LoadQueueTaskMaster
private final ZkPathsConfig zkPaths; private final ZkPathsConfig zkPaths;
private final boolean httpLoading; private final boolean httpLoading;
@GuardedBy("this")
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons = new ConcurrentHashMap<>();
public LoadQueueTaskMaster( public LoadQueueTaskMaster(
Provider<CuratorFramework> curatorFrameworkProvider, Provider<CuratorFramework> curatorFrameworkProvider,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
@ -65,7 +80,7 @@ public class LoadQueueTaskMaster
this.httpLoading = "http".equalsIgnoreCase(config.getLoadQueuePeonType()); this.httpLoading = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
} }
public LoadQueuePeon giveMePeon(ImmutableDruidServer server) private LoadQueuePeon createPeon(ImmutableDruidServer server)
{ {
if (httpLoading) { if (httpLoading) {
return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec); return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec);
@ -81,6 +96,69 @@ public class LoadQueueTaskMaster
} }
} }
public Map<String, LoadQueuePeon> getAllPeons()
{
return loadManagementPeons;
}
public LoadQueuePeon getPeonForServer(ImmutableDruidServer server)
{
return loadManagementPeons.get(server.getName());
}
/**
* Creates a peon for each of the given servers, if it doesn't already exist and
* removes peons for servers not present in the cluster anymore.
* <p>
* This method must not run concurrently with {@link #onLeaderStart()} and
* {@link #onLeaderStop()} so that there are no stray peons if the Coordinator
* is not leader anymore.
*/
public synchronized void resetPeonsForNewServers(List<ImmutableDruidServer> currentServers)
{
if (!isLeader.get()) {
return;
}
final Set<String> oldServers = Sets.newHashSet(loadManagementPeons.keySet());
// Start peons for new servers
for (ImmutableDruidServer server : currentServers) {
loadManagementPeons.computeIfAbsent(server.getName(), serverName -> {
LoadQueuePeon loadQueuePeon = createPeon(server);
loadQueuePeon.start();
log.debug("Created LoadQueuePeon for server[%s].", server.getName());
return loadQueuePeon;
});
}
// Remove peons for disappeared servers
for (ImmutableDruidServer server : currentServers) {
oldServers.remove(server.getName());
}
for (String name : oldServers) {
log.debug("Removing LoadQueuePeon for disappeared server[%s].", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();
}
}
public synchronized void onLeaderStart()
{
isLeader.set(true);
}
/**
* Stops and removes all peons.
*/
public synchronized void onLeaderStop()
{
isLeader.set(false);
loadManagementPeons.values().forEach(LoadQueuePeon::stop);
loadManagementPeons.clear();
}
public boolean isHttpLoading() public boolean isHttpLoading()
{ {
return httpLoading; return httpLoading;

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -51,7 +50,6 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode; import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy; import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
@ -583,60 +581,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager); EasyMock.verify(metadataRuleManager);
} }
@Test
public void testBalancerThreadNumber()
{
ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(scheduledExecutorFactory);
DruidCoordinator c = new DruidCoordinator(
druidCoordinatorConfig,
EasyMock.createNiceMock(JacksonConfigManager.class),
null,
null,
null,
null,
scheduledExecutorFactory,
null,
loadQueueTaskMaster,
null,
null,
null,
null,
null,
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
new RandomBalancerStrategyFactory(),
null,
null,
null
);
// before initialization
Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
Assert.assertNull(c.getBalancerExec());
// first initialization
c.createBalancerStrategy(5);
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService firstExec = c.getBalancerExec();
Assert.assertNotNull(firstExec);
// second initialization, expect no changes as cachedBalancerThreadNumber is not changed
c.createBalancerStrategy(5);
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService secondExec = c.getBalancerExec();
Assert.assertNotNull(secondExec);
Assert.assertSame(firstExec, secondExec);
// third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10
c.createBalancerStrategy(10);
Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
ListeningExecutorService thirdExec = c.getBalancerExec();
Assert.assertNotNull(thirdExec);
Assert.assertNotSame(secondExec, thirdExec);
Assert.assertNotSame(firstExec, thirdExec);
}
@Test @Test
public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty() public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
{ {
@ -767,6 +711,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
{ {
// Some nessesary setup to start the Coordinator // Some nessesary setup to start the Coordinator
setupPeons(Collections.emptyMap());
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect( EasyMock.expect(
configManager.watch( configManager.watch(
@ -805,10 +750,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes(); EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
EasyMock.expect(segmentsMetadataManager.iterateAllUsedSegments()) EasyMock.expect(segmentsMetadataManager.iterateAllUsedSegments())
.andReturn(Collections.singletonList(dataSegment)).anyTimes(); .andReturn(Collections.singletonList(dataSegment)).anyTimes();
EasyMock.replay(segmentsMetadataManager);
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes(); EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.replay(serverInventoryView); EasyMock.replay(serverInventoryView, loadQueueTaskMaster, segmentsMetadataManager);
// Create CoordinatorCustomDutyGroups // Create CoordinatorCustomDutyGroups
// We will have two groups and each group has one duty // We will have two groups and each group has one duty
@ -942,7 +886,16 @@ public class DruidCoordinatorTest extends CuratorTestBase
private void setupPeons(Map<String, LoadQueuePeon> peonMap) private void setupPeons(Map<String, LoadQueuePeon> peonMap)
{ {
EasyMock.expect(loadQueueTaskMaster.giveMePeon(EasyMock.anyObject())).andAnswer( loadQueueTaskMaster.resetPeonsForNewServers(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
loadQueueTaskMaster.onLeaderStart();
EasyMock.expectLastCall().anyTimes();
loadQueueTaskMaster.onLeaderStop();
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(loadQueueTaskMaster.getAllPeons()).andReturn(peonMap).anyTimes();
EasyMock.expect(loadQueueTaskMaster.getPeonForServer(EasyMock.anyObject())).andAnswer(
() -> peonMap.get(((ImmutableDruidServer) EasyMock.getCurrentArgument(0)).getName()) () -> peonMap.get(((ImmutableDruidServer) EasyMock.getCurrentArgument(0)).getName())
).anyTimes(); ).anyTimes();
} }

View File

@ -22,42 +22,45 @@ package org.apache.druid.server.coordinator.balancer;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class BalancerStrategyFactoryTest public class BalancerStrategyFactoryTest
{ {
private final ObjectMapper MAPPER = TestHelper.makeJsonMapper(); private final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
private ListeningExecutorService executorService;
@Before
public void setup()
{
executorService = MoreExecutors.listeningDecorator(
new BlockingExecutorService("StrategyFactoryTest-%s")
);
}
@After
public void tearDown()
{
executorService.shutdownNow();
}
@Test @Test
public void testCachingCostStrategyFallsBackToCost() throws JsonProcessingException public void testCachingCostStrategyFallsBackToCost() throws JsonProcessingException
{ {
final String json = "{\"strategy\":\"cachingCost\"}"; final String json = "{\"strategy\":\"cachingCost\"}";
BalancerStrategyFactory factory = MAPPER.readValue(json, BalancerStrategyFactory.class); BalancerStrategyFactory factory = MAPPER.readValue(json, BalancerStrategyFactory.class);
BalancerStrategy strategy = factory.createBalancerStrategy(executorService); BalancerStrategy strategy = factory.createBalancerStrategy(1);
Assert.assertTrue(strategy instanceof CostBalancerStrategy); Assert.assertTrue(strategy instanceof CostBalancerStrategy);
Assert.assertFalse(strategy instanceof CachingCostBalancerStrategy); Assert.assertFalse(strategy instanceof CachingCostBalancerStrategy);
factory.stopExecutor();
}
@Test
public void testBalancerFactoryCreatesNewExecutorIfNumThreadsChanges()
{
BalancerStrategyFactory factory = new CostBalancerStrategyFactory();
ListeningExecutorService exec1 = factory.getOrCreateBalancerExecutor(1);
ListeningExecutorService exec2 = factory.getOrCreateBalancerExecutor(2);
Assert.assertTrue(exec1.isShutdown());
Assert.assertNotSame(exec1, exec2);
ListeningExecutorService exec3 = factory.getOrCreateBalancerExecutor(3);
Assert.assertTrue(exec2.isShutdown());
Assert.assertNotSame(exec2, exec3);
ListeningExecutorService exec4 = factory.getOrCreateBalancerExecutor(3);
Assert.assertFalse(exec3.isShutdown());
Assert.assertSame(exec3, exec4);
factory.stopExecutor();
} }
} }

View File

@ -30,7 +30,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy; import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@ -92,7 +92,7 @@ public class BalanceSegmentsTest
server4 = new DruidServer("server4", "server4", null, 100L, ServerType.HISTORICAL, "normal", 0); server4 = new DruidServer("server4", "server4", null, 100L, ServerType.HISTORICAL, "normal", 0);
balancerStrategyExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalanceSegmentsTest-%d")); balancerStrategyExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "BalanceSegmentsTest-%d"));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor); balancerStrategy = new CostBalancerStrategy(balancerStrategyExecutor);
broadcastDatasources = Collections.singleton("datasourceBroadcast"); broadcastDatasources = Collections.singleton("datasourceBroadcast");
} }

View File

@ -19,14 +19,14 @@
package org.apache.druid.server.coordinator.duty; package org.apache.druid.server.coordinator.duty;
import it.unimi.dsi.fastutil.objects.Object2IntMaps; import com.google.common.collect.ImmutableMap;
import it.unimi.dsi.fastutil.objects.Object2LongMaps;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy; import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.server.coordinator.stats.Stats;
import org.junit.Assert; import org.junit.Assert;
@ -36,13 +36,11 @@ import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class CollectSegmentAndServerStatsTest public class CollectSegmentAndServerStatsTest
{ {
@Mock @Mock
private DruidCoordinator mockDruidCoordinator; private LoadQueueTaskMaster mockTaskMaster;
@Test @Test
public void testCollectedSegmentStats() public void testCollectedSegmentStats()
@ -55,17 +53,15 @@ public class CollectSegmentAndServerStatsTest
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null)) .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null, null))
.build(); .build();
Mockito.when(mockDruidCoordinator.getDatasourceToUnavailableSegmentCount()) Mockito.when(mockTaskMaster.getAllPeons())
.thenReturn(Object2IntMaps.singleton("ds", 10)); .thenReturn(ImmutableMap.of("server1", new TestLoadQueuePeon()));
Mockito.when(mockDruidCoordinator.getTierToDatasourceToUnderReplicatedCount(false))
.thenReturn(Collections.singletonMap("ds", Object2LongMaps.singleton("tier1", 100)));
CoordinatorDuty duty = new CollectSegmentAndServerStats(mockDruidCoordinator); CoordinatorDuty duty = new CollectSegmentAndServerStats(mockTaskMaster);
DruidCoordinatorRuntimeParams params = duty.run(runtimeParams); DruidCoordinatorRuntimeParams params = duty.run(runtimeParams);
CoordinatorRunStats stats = params.getCoordinatorStats(); CoordinatorRunStats stats = params.getCoordinatorStats();
Assert.assertTrue(stats.hasStat(Stats.Segments.UNAVAILABLE)); Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_LOAD));
Assert.assertTrue(stats.hasStat(Stats.Segments.UNDER_REPLICATED)); Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_DROP));
} }
} }

View File

@ -1774,7 +1774,7 @@ public class CompactSegmentsTest
{ {
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc()) .newBuilder(DateTimes.nowUtc())
.withSnapshotOfDataSourcesWithAllUsedSegments(dataSources) .withDataSourcesSnapshot(dataSources)
.withCompactionConfig( .withCompactionConfig(
new CoordinatorCompactionConfig( new CoordinatorCompactionConfig(
compactionConfigs, compactionConfigs,

View File

@ -91,7 +91,7 @@ public class MarkOvershadowedSegmentsAsUnusedTest
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc()) .newBuilder(DateTimes.nowUtc())
.withSnapshotOfDataSourcesWithAllUsedSegments( .withDataSourcesSnapshot(
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments() segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
) )
.withDruidCluster(druidCluster) .withDruidCluster(druidCluster)