From 8d214cb785724cb930c4938df1bb247a61d33710 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth <954799+szilard-nemeth@users.noreply.github.com> Date: Fri, 17 Dec 2021 00:18:14 +0100 Subject: [PATCH] YARN-10951. CapacityScheduler: Move all fields and initializer code that belongs to async scheduling to a new class (#3800). Contributed by Szilard Nemeth --- .../scheduler/capacity/CapacityScheduler.java | 200 +++++++++++------- .../CapacitySchedulerConfiguration.java | 6 + .../TestCapacitySchedulerAsyncScheduling.java | 2 +- .../capacity/TestCapacitySchedulerPerf.java | 4 +- .../scheduler/capacity/TestLeafQueue.java | 2 +- .../scheduler/capacity/TestReservations.java | 74 +++---- .../scheduler/capacity/TestUtils.java | 11 +- 7 files changed, 182 insertions(+), 117 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index abd40a8062b..16c18bcd5c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -187,9 +187,6 @@ public class CapacityScheduler extends private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr; - // timeout to join when we stop this service - protected final long THREAD_JOIN_TIMEOUT_MS = 1000; - private PreemptionManager preemptionManager = new PreemptionManager(); private volatile boolean isLazyPreemptionEnabled = false; @@ -227,10 +224,7 @@ public class CapacityScheduler extends private ResourceCalculator calculator; private boolean usePortForNodeName; - private boolean scheduleAsynchronously; - @VisibleForTesting - protected List asyncSchedulerThreads; - private ResourceCommitterService resourceCommitterService; + private AsyncSchedulingConfiguration asyncSchedulingConf; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; private boolean multiNodePlacementEnabled; @@ -238,16 +232,6 @@ public class CapacityScheduler extends private boolean printedVerboseLoggingForAsyncScheduling; private boolean appShouldFailFast; - /** - * EXPERT - */ - private long asyncScheduleInterval; - private static final String ASYNC_SCHEDULER_INTERVAL = - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX - + ".scheduling-interval-ms"; - private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; - private long asyncMaxPendingBacklogs; - private CSMaxRunningAppsEnforcer maxRunningEnforcer; public CapacityScheduler() { @@ -376,27 +360,7 @@ public class CapacityScheduler extends } private void initAsyncSchedulingProperties() { - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - - // number of threads for async scheduling - int maxAsyncSchedulingThreads = this.conf.getInt( - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1); - maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); - - if (scheduleAsynchronously) { - asyncSchedulerThreads = new ArrayList<>(); - for (int i = 0; i < maxAsyncSchedulingThreads; i++) { - asyncSchedulerThreads.add(new AsyncScheduleThread(this)); - } - resourceCommitterService = new ResourceCommitterService(this); - asyncMaxPendingBacklogs = this.conf.getInt( - CapacitySchedulerConfiguration. - SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, - CapacitySchedulerConfiguration. - DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); - } + this.asyncSchedulingConf = new AsyncSchedulingConfiguration(conf, this); } private void initMultiNodePlacement() { @@ -419,8 +383,8 @@ public class CapacityScheduler extends getResourceCalculator().getClass(), getMinimumResourceCapability(), getMaximumResourceCapability(), - scheduleAsynchronously, - asyncScheduleInterval, + asyncSchedulingConf.isScheduleAsynchronously(), + asyncSchedulingConf.getAsyncScheduleInterval(), multiNodePlacementEnabled, assignMultipleEnabled, maxAssignPerHeartbeat, @@ -431,15 +395,7 @@ public class CapacityScheduler extends writeLock.lock(); try { activitiesManager.start(); - if (scheduleAsynchronously) { - Preconditions.checkNotNull(asyncSchedulerThreads, - "asyncSchedulerThreads is null"); - for (Thread t : asyncSchedulerThreads) { - t.start(); - } - - resourceCommitterService.start(); - } + asyncSchedulingConf.startThreads(); } finally { writeLock.unlock(); } @@ -465,14 +421,7 @@ public class CapacityScheduler extends writeLock.lock(); try { this.activitiesManager.stop(); - if (scheduleAsynchronously && asyncSchedulerThreads != null) { - for (Thread t : asyncSchedulerThreads) { - t.interrupt(); - t.join(THREAD_JOIN_TIMEOUT_MS); - } - resourceCommitterService.interrupt(); - resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS); - } + asyncSchedulingConf.serviceStopInvoked(); } finally { writeLock.unlock(); } @@ -539,7 +488,7 @@ public class CapacityScheduler extends } long getAsyncScheduleInterval() { - return asyncScheduleInterval; + return asyncSchedulingConf.getAsyncScheduleInterval(); } private final static Random random = new Random(System.currentTimeMillis()); @@ -671,6 +620,11 @@ public class CapacityScheduler extends Thread.sleep(cs.getAsyncScheduleInterval()); } + @VisibleForTesting + public void setAsyncSchedulingConf(AsyncSchedulingConfiguration conf) { + this.asyncSchedulingConf = conf; + } + static class AsyncScheduleThread extends Thread { private final CapacityScheduler cs; @@ -692,7 +646,7 @@ public class CapacityScheduler extends } else { // Don't run schedule if we have some pending backlogs already if (cs.getAsyncSchedulingPendingBacklogs() - > cs.asyncMaxPendingBacklogs) { + > cs.asyncSchedulingConf.getAsyncMaxPendingBacklogs()) { Thread.sleep(1); } else{ schedule(cs); @@ -1479,7 +1433,7 @@ public class CapacityScheduler extends } // Try to do scheduling - if (!scheduleAsynchronously) { + if (!asyncSchedulingConf.isScheduleAsynchronously()) { writeLock.lock(); try { // reset allocation and reservation stats before we start doing any @@ -2291,8 +2245,8 @@ public class CapacityScheduler extends "Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); - if (scheduleAsynchronously && getNumClusterNodes() == 1) { - for (AsyncScheduleThread t : asyncSchedulerThreads) { + if (asyncSchedulingConf.isScheduleAsynchronously() && getNumClusterNodes() == 1) { + for (AsyncScheduleThread t : asyncSchedulingConf.asyncSchedulerThreads) { t.beginSchedule(); } } @@ -2340,11 +2294,7 @@ public class CapacityScheduler extends new ResourceLimits(clusterResource)); int numNodes = nodeTracker.nodeCount(); - if (scheduleAsynchronously && numNodes == 0) { - for (AsyncScheduleThread t : asyncSchedulerThreads) { - t.suspendSchedule(); - } - } + asyncSchedulingConf.nodeRemoved(numNodes); LOG.info( "Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " @@ -3092,9 +3042,9 @@ public class CapacityScheduler extends return; } - if (scheduleAsynchronously) { + if (asyncSchedulingConf.isScheduleAsynchronously()) { // Submit to a commit thread and commit it async-ly - resourceCommitterService.addNewCommitRequest(request); + asyncSchedulingConf.resourceCommitterService.addNewCommitRequest(request); } else{ // Otherwise do it sync-ly. tryCommit(cluster, request, true); @@ -3339,10 +3289,7 @@ public class CapacityScheduler extends } public int getAsyncSchedulingPendingBacklogs() { - if (scheduleAsynchronously) { - return resourceCommitterService.getPendingBacklogs(); - } - return 0; + return asyncSchedulingConf.getPendingBacklogs(); } @Override @@ -3483,7 +3430,7 @@ public class CapacityScheduler extends } public int getNumAsyncSchedulerThreads() { - return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size(); + return asyncSchedulingConf.getNumAsyncSchedulerThreads(); } @VisibleForTesting @@ -3503,4 +3450,109 @@ public class CapacityScheduler extends public void setQueueManager(CapacitySchedulerQueueManager qm) { this.queueManager = qm; } + + @VisibleForTesting + public List getAsyncSchedulerThreads() { + return asyncSchedulingConf.getAsyncSchedulerThreads(); + } + + static class AsyncSchedulingConfiguration { + // timeout to join when we stop this service + private static final long THREAD_JOIN_TIMEOUT_MS = 1000; + + @VisibleForTesting + protected List asyncSchedulerThreads; + private ResourceCommitterService resourceCommitterService; + + private long asyncScheduleInterval; + private static final String ASYNC_SCHEDULER_INTERVAL = + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms"; + private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; + private long asyncMaxPendingBacklogs; + + private final boolean scheduleAsynchronously; + + AsyncSchedulingConfiguration(CapacitySchedulerConfiguration conf, + CapacityScheduler cs) { + this.scheduleAsynchronously = conf.getScheduleAynschronously(); + if (this.scheduleAsynchronously) { + this.asyncScheduleInterval = conf.getLong( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_INTERVAL, + CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL); + // number of threads for async scheduling + int maxAsyncSchedulingThreads = conf.getInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); + this.asyncMaxPendingBacklogs = conf.getInt( + CapacitySchedulerConfiguration. + SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, + CapacitySchedulerConfiguration. + DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); + + this.asyncSchedulerThreads = new ArrayList<>(); + for (int i = 0; i < maxAsyncSchedulingThreads; i++) { + asyncSchedulerThreads.add(new AsyncScheduleThread(cs)); + } + this.resourceCommitterService = new ResourceCommitterService(cs); + } + } + public boolean isScheduleAsynchronously() { + return scheduleAsynchronously; + } + public long getAsyncScheduleInterval() { + return asyncScheduleInterval; + } + public long getAsyncMaxPendingBacklogs() { + return asyncMaxPendingBacklogs; + } + + public void startThreads() { + if (scheduleAsynchronously) { + Preconditions.checkNotNull(asyncSchedulerThreads, + "asyncSchedulerThreads is null"); + for (Thread t : asyncSchedulerThreads) { + t.start(); + } + + resourceCommitterService.start(); + } + } + + public void serviceStopInvoked() throws InterruptedException { + if (scheduleAsynchronously && asyncSchedulerThreads != null) { + for (Thread t : asyncSchedulerThreads) { + t.interrupt(); + t.join(THREAD_JOIN_TIMEOUT_MS); + } + resourceCommitterService.interrupt(); + resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS); + } + } + + public void nodeRemoved(int numNodes) { + if (scheduleAsynchronously && numNodes == 0) { + for (AsyncScheduleThread t : asyncSchedulerThreads) { + t.suspendSchedule(); + } + } + } + + public int getPendingBacklogs() { + if (scheduleAsynchronously) { + return resourceCommitterService.getPendingBacklogs(); + } + return 0; + } + + public int getNumAsyncSchedulerThreads() { + return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size(); + } + + @VisibleForTesting + public List getAsyncSchedulerThreads() { + return asyncSchedulerThreads; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 2716ddebbdc..628c58576e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -273,6 +273,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS = SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs"; + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_INTERVAL = + SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms"; + @Private + public static final long DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL = 5; + @Private public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 98214a030c9..b36e0edc735 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -153,7 +153,7 @@ public class TestCapacitySchedulerAsyncScheduling { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); for (CapacityScheduler.AsyncScheduleThread thread : - cs.asyncSchedulerThreads) { + cs.getAsyncSchedulerThreads()) { Assert.assertTrue(thread.getName() .startsWith("AsyncCapacitySchedulerThread")); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java index b71fe063927..b8209a54952 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java @@ -237,7 +237,7 @@ public class TestCapacitySchedulerPerf { if (numThreads > 0) { // disable async scheduling threads - for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) { + for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) { t.suspendSchedule(); } } @@ -268,7 +268,7 @@ public class TestCapacitySchedulerPerf { if (numThreads > 0) { // enable async scheduling threads - for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) { + for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) { t.beginSchedule(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 1da7ce18ee0..eca065b1487 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -936,7 +936,7 @@ public class TestLeafQueue { LeafQueue q, final Map nodes, final Map apps) throws IOException { - TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps); + TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps, csConf); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 53b1d160dc2..c6f947febc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -291,7 +291,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -305,7 +305,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -319,7 +319,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -337,7 +337,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -356,7 +356,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -376,7 +376,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(18 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -477,7 +477,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -491,7 +491,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(4 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -514,7 +514,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(12 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -530,7 +530,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(14 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(4 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -628,7 +628,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -642,7 +642,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -656,7 +656,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -674,7 +674,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -693,7 +693,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -713,7 +713,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -811,7 +811,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -824,7 +824,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -837,7 +837,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -854,7 +854,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -872,7 +872,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1102,7 +1102,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1115,7 +1115,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1128,7 +1128,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1144,7 +1144,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -1293,7 +1293,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1306,7 +1306,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1319,7 +1319,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1335,7 +1335,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentReservation().getMemorySize()); @@ -1462,7 +1462,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1476,7 +1476,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1490,7 +1490,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1508,7 +1508,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(Resources.createResource(10 * GB)), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1526,7 +1526,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_2, new ResourceLimits(Resources.createResource(10 * GB)), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1542,7 +1542,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1557,7 +1557,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(21 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(8 * GB, a.getMetrics().getReservedMB()); @@ -1574,7 +1574,7 @@ public class TestReservations { TestUtils.applyResourceCommitRequest(clusterResource, a.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps, csConf); assertEquals(21 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(8 * GB, a.getMetrics().getReservedMB()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 026206ac38f..28ca66847de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -447,7 +447,14 @@ public class TestUtils { public static void applyResourceCommitRequest(Resource clusterResource, CSAssignment csAssignment, final Map nodes, - final Map apps) + final Map apps) throws IOException { + applyResourceCommitRequest(clusterResource, csAssignment, nodes, apps, null); + } + + public static void applyResourceCommitRequest(Resource clusterResource, + CSAssignment csAssignment, + final Map nodes, + final Map apps, CapacitySchedulerConfiguration csConf) throws IOException { CapacityScheduler cs = new CapacityScheduler() { @Override @@ -461,7 +468,7 @@ public class TestUtils { return apps.get(applicationAttemptId); } }; - + cs.setAsyncSchedulingConf(new CapacityScheduler.AsyncSchedulingConfiguration(csConf, cs)); cs.setResourceCalculator(new DefaultResourceCalculator()); cs.submitResourceCommitRequest(clusterResource,