From 66ac476b4823df351cbaab0971da0adc7d08c242 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth <954799+szilard-nemeth@users.noreply.github.com> Date: Wed, 27 Oct 2021 17:13:49 +0200 Subject: [PATCH] YARN-10924. Clean up CapacityScheduler#initScheduler (#3581) Contributed by Szilard Nemeth --- .../scheduler/capacity/CapacityScheduler.java | 165 ++++++++++-------- 1 file changed, 89 insertions(+), 76 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 592cb5aedab..b8091c7e05c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -302,38 +302,13 @@ public class CapacityScheduler extends IOException, YarnException { writeLock.lock(); try { - String confProviderStr = configuration.get( - YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, - YarnConfiguration.DEFAULT_CONFIGURATION_STORE); - switch (confProviderStr) { - case YarnConfiguration.FILE_CONFIGURATION_STORE: - this.csConfProvider = - new FileBasedCSConfigurationProvider(rmContext); - break; - case YarnConfiguration.MEMORY_CONFIGURATION_STORE: - case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: - case YarnConfiguration.ZK_CONFIGURATION_STORE: - case YarnConfiguration.FS_CONFIGURATION_STORE: - this.csConfProvider = new MutableCSConfigurationProvider(rmContext); - break; - default: - throw new IOException("Invalid configuration store class: " + - confProviderStr); - } + this.csConfProvider = getCsConfProvider(configuration); this.csConfProvider.init(configuration); this.conf = this.csConfProvider.loadConfiguration(configuration); validateConf(this.conf); this.minimumAllocation = super.getMinimumAllocation(); initMaximumResourceCapability(super.getMaximumAllocation()); - this.calculator = this.conf.getResourceCalculator(); - if (this.calculator instanceof DefaultResourceCalculator - && ResourceUtils.getNumberOfKnownResourceTypes() > 2) { - throw new YarnRuntimeException("RM uses DefaultResourceCalculator which" - + " used only memory as resource-type but invalid resource-types" - + " specified " + ResourceUtils.getResourceTypes() + ". Use" - + " DominantResourceCalculator instead to make effective use of" - + " these resource-types"); - } + this.calculator = initResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); @@ -341,71 +316,109 @@ public class CapacityScheduler extends this.queueManager = new CapacitySchedulerQueueManager(yarnConf, this.labelManager, this.appPriorityACLManager); this.queueManager.setCapacitySchedulerContext(this); - this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager(); - this.activitiesManager = new ActivitiesManager(rmContext); activitiesManager.init(conf); initializeQueues(this.conf); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); - - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled(); this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat(); - - this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast( - getConfig()); - - // number of threads for async scheduling - int maxAsyncSchedulingThreads = this.conf.getInt( - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, - 1); - maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); - - if (scheduleAsynchronously) { - asyncSchedulerThreads = new ArrayList<>(); - for (int i = 0; i < maxAsyncSchedulingThreads; i++) { - asyncSchedulerThreads.add(new AsyncScheduleThread(this)); - } - resourceCommitterService = new ResourceCommitterService(this); - asyncMaxPendingBacklogs = this.conf.getInt( - CapacitySchedulerConfiguration. - SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, - CapacitySchedulerConfiguration. - DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); - } + this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(getConfig()); + initAsyncSchedulingProperties(); // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); - // Register CS specific multi-node policies to common MultiNodeManager - // which will add to a MultiNodeSorter which gives a pre-sorted list of - // nodes to scheduler's allocation. - multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); - if(rmContext.getMultiNodeSortingManager() != null) { - rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames( - multiNodePlacementEnabled, - this.conf.getMultiNodePlacementPolicies()); - } - - LOG.info("Initialized CapacityScheduler with " + "calculator=" - + getResourceCalculator().getClass() + ", " + "minimumAllocation=" - + getMinimumResourceCapability() + ", " + "maximumAllocation=" - + getMaximumResourceCapability() + ", " + "asynchronousScheduling=" - + scheduleAsynchronously + ", " + "asyncScheduleInterval=" - + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled=" - + multiNodePlacementEnabled + ", " + "assignMultipleEnabled=" - + assignMultipleEnabled + ", " + "maxAssignPerHeartbeat=" - + maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit=" - + offswitchPerHeartbeatLimit); + initMultiNodePlacement(); + printSchedulerInitialized(); } finally { writeLock.unlock(); } } + private CSConfigurationProvider getCsConfProvider(Configuration configuration) + throws IOException { + String confProviderStr = configuration.get( + YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.DEFAULT_CONFIGURATION_STORE); + switch (confProviderStr) { + case YarnConfiguration.FILE_CONFIGURATION_STORE: + return new FileBasedCSConfigurationProvider(rmContext); + case YarnConfiguration.MEMORY_CONFIGURATION_STORE: + case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: + case YarnConfiguration.ZK_CONFIGURATION_STORE: + case YarnConfiguration.FS_CONFIGURATION_STORE: + return new MutableCSConfigurationProvider(rmContext); + default: + throw new IOException("Invalid configuration store class: " + confProviderStr); + } + } + + private ResourceCalculator initResourceCalculator() { + ResourceCalculator resourceCalculator = this.conf.getResourceCalculator(); + if (resourceCalculator instanceof DefaultResourceCalculator + && ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + throw new YarnRuntimeException("RM uses DefaultResourceCalculator which" + + " used only memory as resource-type but invalid resource-types" + + " specified " + ResourceUtils.getResourceTypes() + ". Use" + + " DominantResourceCalculator instead to make effective use of" + + " these resource-types"); + } + return resourceCalculator; + } + + private void initAsyncSchedulingProperties() { + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL); + + // number of threads for async scheduling + int maxAsyncSchedulingThreads = this.conf.getInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1); + maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); + + if (scheduleAsynchronously) { + asyncSchedulerThreads = new ArrayList<>(); + for (int i = 0; i < maxAsyncSchedulingThreads; i++) { + asyncSchedulerThreads.add(new AsyncScheduleThread(this)); + } + resourceCommitterService = new ResourceCommitterService(this); + asyncMaxPendingBacklogs = this.conf.getInt( + CapacitySchedulerConfiguration. + SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, + CapacitySchedulerConfiguration. + DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); + } + } + + private void initMultiNodePlacement() { + // Register CS specific multi-node policies to common MultiNodeManager + // which will add to a MultiNodeSorter which gives a pre-sorted list of + // nodes to scheduler's allocation. + multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); + if (rmContext.getMultiNodeSortingManager() != null) { + rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames( + multiNodePlacementEnabled, + this.conf.getMultiNodePlacementPolicies()); + } + } + + private void printSchedulerInitialized() { + LOG.info("Initialized CapacityScheduler with calculator={}, minimumAllocation={}, " + + "maximumAllocation={}, asynchronousScheduling={}, asyncScheduleInterval={} ms, " + + "multiNodePlacementEnabled={}, assignMultipleEnabled={}, maxAssignPerHeartbeat={}, " + + "offswitchPerHeartbeatLimit={}", + getResourceCalculator().getClass(), + getMinimumResourceCapability(), + getMaximumResourceCapability(), + scheduleAsynchronously, + asyncScheduleInterval, + multiNodePlacementEnabled, + assignMultipleEnabled, + maxAssignPerHeartbeat, + offswitchPerHeartbeatLimit); + } + private void startSchedulerThreads() { writeLock.lock(); try {