From 07a193a1420b2942e5bc9ac97058438f5435102d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 21 Aug 2023 15:53:22 +0530 Subject: [PATCH] Use separate executor for each coordinator duty group (#14869) Changes: - Use separate executor for every duty group - This change is thread-safe as every duty group uses its own copy of `DruidCoordinatorRuntimeParams` and does not share any other mutable instances with other duty groups. - With the exception of `HistoricalManagementDuties`, duty groups are typically not very compute intensive and mostly perform database or HTTP I/O. So, coordinator resources would still mostly be available for `HistoricalManagementDuties`. --- .../server/coordinator/DruidCoordinator.java | 42 +++++++++++++------ .../CoordinatorSimulationBuilder.java | 14 +++---- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index a7b8417351a..363a5758d09 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -20,12 +20,12 @@ package org.apache.druid.server.coordinator; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntMaps; @@ -46,6 +46,7 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty; import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; @@ -99,7 +100,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -141,7 +141,8 @@ public class DruidCoordinator private final ServiceEmitter emitter; private final OverlordClient overlordClient; - private final ScheduledExecutorService exec; + private final ScheduledExecutorFactory executorFactory; + private final Map dutyGroupExecutors = new HashMap<>(); private final LoadQueueTaskMaster taskMaster; private final ConcurrentHashMap loadManagementPeons = new ConcurrentHashMap<>(); private final SegmentLoadQueueManager loadQueueManager; @@ -216,7 +217,7 @@ public class DruidCoordinator this.metadataStoreManagementDuties = metadataStoreManagementDuties; this.customDutyGroups = customDutyGroups; - this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); + this.executorFactory = scheduledExecutorFactory; this.balancerStrategyFactory = balancerStrategyFactory; this.lookupCoordinatorManager = lookupCoordinatorManager; @@ -395,7 +396,7 @@ public class DruidCoordinator started = false; - exec.shutdownNow(); + stopAllDutyGroupExecutors(); if (balancerExec != null) { balancerExec.shutdownNow(); @@ -492,12 +493,10 @@ public class DruidCoordinator } for (final DutiesRunnable dutiesRunnable : dutiesRunnables) { - // CompactSegmentsDuty can takes a non trival amount of time to complete. - // Hence, we schedule at fixed rate to make sure the other tasks still run at approximately every - // config.getCoordinatorIndexingPeriod() period. Note that cautious should be taken - // if setting config.getCoordinatorIndexingPeriod() lower than the default value. + // Several coordinator duties can take a non trival amount of time to complete. + // Hence, we schedule each duty group on a dedicated executor ScheduledExecutors.scheduleAtFixedRate( - exec, + getOrCreateDutyGroupExecutor(dutiesRunnable.dutyGroupName), config.getCoordinatorStartDelay(), dutiesRunnable.getPeriod(), () -> { @@ -544,6 +543,22 @@ public class DruidCoordinator } } + @GuardedBy("lock") + private ScheduledExecutorService getOrCreateDutyGroupExecutor(String dutyGroup) + { + return dutyGroupExecutors.computeIfAbsent( + dutyGroup, + group -> executorFactory.create(1, "Coordinator-Exec-" + dutyGroup + "-%d") + ); + } + + @GuardedBy("lock") + private void stopAllDutyGroupExecutors() + { + dutyGroupExecutors.values().forEach(ScheduledExecutorService::shutdownNow); + dutyGroupExecutors.clear(); + } + /** * Resets the balancerExec if required and creates a new BalancerStrategy for * the current coordinator run. @@ -733,7 +748,7 @@ public class DruidCoordinator && coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { - dutyRunTime.reset().start(); + dutyRunTime.restart(); params = duty.run(params); dutyRunTime.stop(); @@ -743,7 +758,7 @@ public class DruidCoordinator return; } else { final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName); - final long dutyRunMillis = dutyRunTime.elapsed(TimeUnit.MILLISECONDS); + final long dutyRunMillis = dutyRunTime.millisElapsed(); params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis); } } @@ -769,7 +784,8 @@ public class DruidCoordinator } // Emit the runtime of the full DutiesRunnable - final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS); + groupRunTime.stop(); + final long runMillis = groupRunTime.millisElapsed(); emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis); log.info("Finished coordinator run for group [%s] in [%d] ms.%n", dutyGroupName, runMillis); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index e40bf8ee168..5ad7ffab36b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -277,6 +277,7 @@ public class CoordinatorSimulationBuilder try { env.setUp(); coordinator.start(); + env.executorFactory.findExecutors(); } catch (Exception e) { throw new ISE(e, "Exception while running simulation"); @@ -309,8 +310,8 @@ public class CoordinatorSimulationBuilder verifySimulationRunning(); env.serviceEmitter.flush(); - // Invoke historical duties and metadata duties - env.executorFactory.coordinatorRunner.finishNextPendingTasks(2); + // Invoke historical duties + env.executorFactory.historicalDutiesRunner.finishNextPendingTasks(1); } @Override @@ -504,7 +505,6 @@ public class CoordinatorSimulationBuilder inventory.setUp(); coordinatorInventoryView.setUp(); lifecycle.start(); - executorFactory.setUp(); leaderSelector.becomeLeader(); EasyMock.replay(mocks.toArray()); } @@ -554,7 +554,7 @@ public class CoordinatorSimulationBuilder static final String HISTORICAL_LOADER = "historical-loader-%d"; static final String LOAD_QUEUE_EXECUTOR = "load-queue-%d"; static final String LOAD_CALLBACK_EXECUTOR = "load-callback-%d"; - static final String COORDINATOR_RUNNER = "Coordinator-Exec--%d"; + static final String COORDINATOR_RUNNER = "Coordinator-Exec-HistoricalManagementDuties-%d"; private final Map blockingExecutors = new HashMap<>(); private final boolean directExecution; @@ -562,7 +562,7 @@ public class CoordinatorSimulationBuilder private BlockingExecutorService historicalLoader; private BlockingExecutorService loadQueueExecutor; private BlockingExecutorService loadCallbackExecutor; - private BlockingExecutorService coordinatorRunner; + private BlockingExecutorService historicalDutiesRunner; private ExecutorFactory(boolean directExecution) { @@ -588,9 +588,9 @@ public class CoordinatorSimulationBuilder return blockingExecutors.get(nameFormat); } - private void setUp() + private void findExecutors() { - coordinatorRunner = findExecutor(COORDINATOR_RUNNER); + historicalDutiesRunner = findExecutor(COORDINATOR_RUNNER); historicalLoader = findExecutor(HISTORICAL_LOADER); loadQueueExecutor = findExecutor(LOAD_QUEUE_EXECUTOR); loadCallbackExecutor = findExecutor(LOAD_CALLBACK_EXECUTOR);