Use separate executor for each coordinator duty group (#14869)

Changes:
- Use separate executor for every duty group
- This change is thread-safe as every duty group uses its own copy of
`DruidCoordinatorRuntimeParams` and does not share any other mutable instances
with other duty groups.
- With the exception of `HistoricalManagementDuties`, duty groups are typically not
very compute intensive and mostly perform database or HTTP I/O. So, coordinator
resources would still mostly be available for `HistoricalManagementDuties`.
This commit is contained in:
Kashif Faraz 2023-08-21 15:53:22 +05:30 committed by GitHub
parent 9065ef1aff
commit 07a193a142
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 20 deletions

View File

@ -20,12 +20,12 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
@ -46,6 +46,7 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@ -99,7 +100,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -141,7 +141,8 @@ public class DruidCoordinator
private final ServiceEmitter emitter;
private final OverlordClient overlordClient;
private final ScheduledExecutorService exec;
private final ScheduledExecutorFactory executorFactory;
private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new HashMap<>();
private final LoadQueueTaskMaster taskMaster;
private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons = new ConcurrentHashMap<>();
private final SegmentLoadQueueManager loadQueueManager;
@ -216,7 +217,7 @@ public class DruidCoordinator
this.metadataStoreManagementDuties = metadataStoreManagementDuties;
this.customDutyGroups = customDutyGroups;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.executorFactory = scheduledExecutorFactory;
this.balancerStrategyFactory = balancerStrategyFactory;
this.lookupCoordinatorManager = lookupCoordinatorManager;
@ -395,7 +396,7 @@ public class DruidCoordinator
started = false;
exec.shutdownNow();
stopAllDutyGroupExecutors();
if (balancerExec != null) {
balancerExec.shutdownNow();
@ -492,12 +493,10 @@ public class DruidCoordinator
}
for (final DutiesRunnable dutiesRunnable : dutiesRunnables) {
// CompactSegmentsDuty can takes a non trival amount of time to complete.
// Hence, we schedule at fixed rate to make sure the other tasks still run at approximately every
// config.getCoordinatorIndexingPeriod() period. Note that cautious should be taken
// if setting config.getCoordinatorIndexingPeriod() lower than the default value.
// Several coordinator duties can take a non trival amount of time to complete.
// Hence, we schedule each duty group on a dedicated executor
ScheduledExecutors.scheduleAtFixedRate(
exec,
getOrCreateDutyGroupExecutor(dutiesRunnable.dutyGroupName),
config.getCoordinatorStartDelay(),
dutiesRunnable.getPeriod(),
() -> {
@ -544,6 +543,22 @@ public class DruidCoordinator
}
}
@GuardedBy("lock")
private ScheduledExecutorService getOrCreateDutyGroupExecutor(String dutyGroup)
{
return dutyGroupExecutors.computeIfAbsent(
dutyGroup,
group -> executorFactory.create(1, "Coordinator-Exec-" + dutyGroup + "-%d")
);
}
@GuardedBy("lock")
private void stopAllDutyGroupExecutors()
{
dutyGroupExecutors.values().forEach(ScheduledExecutorService::shutdownNow);
dutyGroupExecutors.clear();
}
/**
* Resets the balancerExec if required and creates a new BalancerStrategy for
* the current coordinator run.
@ -733,7 +748,7 @@ public class DruidCoordinator
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
dutyRunTime.reset().start();
dutyRunTime.restart();
params = duty.run(params);
dutyRunTime.stop();
@ -743,7 +758,7 @@ public class DruidCoordinator
return;
} else {
final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
final long dutyRunMillis = dutyRunTime.elapsed(TimeUnit.MILLISECONDS);
final long dutyRunMillis = dutyRunTime.millisElapsed();
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis);
}
}
@ -769,7 +784,8 @@ public class DruidCoordinator
}
// Emit the runtime of the full DutiesRunnable
final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
groupRunTime.stop();
final long runMillis = groupRunTime.millisElapsed();
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms.%n", dutyGroupName, runMillis);
}

View File

@ -277,6 +277,7 @@ public class CoordinatorSimulationBuilder
try {
env.setUp();
coordinator.start();
env.executorFactory.findExecutors();
}
catch (Exception e) {
throw new ISE(e, "Exception while running simulation");
@ -309,8 +310,8 @@ public class CoordinatorSimulationBuilder
verifySimulationRunning();
env.serviceEmitter.flush();
// Invoke historical duties and metadata duties
env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
// Invoke historical duties
env.executorFactory.historicalDutiesRunner.finishNextPendingTasks(1);
}
@Override
@ -504,7 +505,6 @@ public class CoordinatorSimulationBuilder
inventory.setUp();
coordinatorInventoryView.setUp();
lifecycle.start();
executorFactory.setUp();
leaderSelector.becomeLeader();
EasyMock.replay(mocks.toArray());
}
@ -554,7 +554,7 @@ public class CoordinatorSimulationBuilder
static final String HISTORICAL_LOADER = "historical-loader-%d";
static final String LOAD_QUEUE_EXECUTOR = "load-queue-%d";
static final String LOAD_CALLBACK_EXECUTOR = "load-callback-%d";
static final String COORDINATOR_RUNNER = "Coordinator-Exec--%d";
static final String COORDINATOR_RUNNER = "Coordinator-Exec-HistoricalManagementDuties-%d";
private final Map<String, BlockingExecutorService> blockingExecutors = new HashMap<>();
private final boolean directExecution;
@ -562,7 +562,7 @@ public class CoordinatorSimulationBuilder
private BlockingExecutorService historicalLoader;
private BlockingExecutorService loadQueueExecutor;
private BlockingExecutorService loadCallbackExecutor;
private BlockingExecutorService coordinatorRunner;
private BlockingExecutorService historicalDutiesRunner;
private ExecutorFactory(boolean directExecution)
{
@ -588,9 +588,9 @@ public class CoordinatorSimulationBuilder
return blockingExecutors.get(nameFormat);
}
private void setUp()
private void findExecutors()
{
coordinatorRunner = findExecutor(COORDINATOR_RUNNER);
historicalDutiesRunner = findExecutor(COORDINATOR_RUNNER);
historicalLoader = findExecutor(HISTORICAL_LOADER);
loadQueueExecutor = findExecutor(LOAD_QUEUE_EXECUTOR);
loadCallbackExecutor = findExecutor(LOAD_CALLBACK_EXECUTOR);