From cb87e4dc927731e32b0bbcf678bb5600835ff28d Mon Sep 17 00:00:00 2001 From: Sunil G Date: Wed, 13 Dec 2017 22:49:58 +0530 Subject: [PATCH] YARN-7643. Handle recovery of applications in case of auto-created leaf queue mapping. Contributed by Suma Shivaprasad. --- .../server/resourcemanager/RMAppManager.java | 26 ++- .../UserGroupMappingPlacementRule.java | 2 + .../scheduler/capacity/AbstractCSQueue.java | 2 +- .../scheduler/capacity/CapacityScheduler.java | 95 +++++++---- .../TestWorkPreservingRMRestart.java | 156 ++++++++++++++++++ ...CapacitySchedulerAutoCreatedQueueBase.java | 46 +++++- 6 files changed, 275 insertions(+), 52 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index b21fb737f52..5ea11525ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -366,22 +366,20 @@ public class RMAppManager implements EventHandler, String user, boolean isRecovery, long startTime) throws YarnException { ApplicationPlacementContext placementContext = null; + try { + placementContext = placeApplication(rmContext, submissionContext, user); + } catch (YarnException e) { + String msg = + "Failed to place application " + submissionContext.getApplicationId() + + " to queue and specified " + "queue is invalid : " + + submissionContext.getQueue(); + LOG.error(msg, e); + throw e; + } - // We only do queue mapping when it's a new application + // We only replace the queue when it's a new application if (!isRecovery) { - try { - // Do queue mapping - placementContext = placeApplication(rmContext, - submissionContext, user); - replaceQueueFromPlacementContext(placementContext, - submissionContext); - } catch (YarnException e) { - String msg = "Failed to place application " + - submissionContext.getApplicationId() + " to queue and specified " - + "queue is invalid : " + submissionContext.getQueue(); - LOG.error(msg, e); - throw e; - } + replaceQueueFromPlacementContext(placementContext, submissionContext); // fail the submission if configured application timeout value is invalid RMServerUtils.validateApplicationTimeouts( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index 9901f4a1e0b..d03b832ad8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -184,6 +184,8 @@ public class UserGroupMappingPlacementRule extends PlacementRule { if (mappedQueue != null) { // We have a mapping, should we use it? if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + //queueName will be same as mapped queue name in case of recovery + || queueName.equals(mappedQueue.getQueue()) || overrideWithQueueMappings) { LOG.info("Application " + applicationId + " user " + user + " mapping [" + queueName + "] to [" + mappedQueue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 4df4cf23d13..9afbdd5957c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -147,7 +147,7 @@ public abstract class AbstractCSQueue implements CSQueue { this.metrics = old != null ? (CSQueueMetrics) old.getMetrics() : CSQueueMetrics.forQueue(getQueuePath(), parent, - configuration.getEnableUserMetrics(), cs.getConf()); + cs.getConfiguration().getEnableUserMetrics(), cs.getConf()); this.csContext = cs; this.minimumAllocation = csContext.getMinimumResourceCapability(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 8de363140fb..000f59cfa28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -650,24 +650,28 @@ public class CapacityScheduler extends return this.queueManager.getQueue(queueName); } - private void addApplicationOnRecovery( - ApplicationId applicationId, String queueName, String user, - Priority priority) { + private void addApplicationOnRecovery(ApplicationId applicationId, + String queueName, String user, + Priority priority, ApplicationPlacementContext placementContext) { try { writeLock.lock(); - CSQueue queue = getQueue(queueName); + //check if the queue needs to be auto-created during recovery + CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user, + queueName, placementContext, true); + if (queue == null) { //During a restart, this indicates a queue was removed, which is //not presently supported if (!YarnConfiguration.shouldRMFailFast(getConfig())) { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.KILL, - "Application killed on recovery as it was submitted to queue " - + queueName + " which no longer exists after restart.")); + "Application killed on recovery as it" + + " was submitted to queue " + queueName + + " which no longer exists after restart.")); return; } else{ - String queueErrorMsg = "Queue named " + queueName - + " missing during application recovery." + String queueErrorMsg = "Queue named " + queueName + " missing " + + "during application recovery." + " Queue removal during recovery is not presently " + "supported by the capacity scheduler, please " + "restart with all queues configured" @@ -682,8 +686,8 @@ public class CapacityScheduler extends if (!YarnConfiguration.shouldRMFailFast(getConfig())) { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.KILL, - "Application killed on recovery as it was submitted to queue " - + queueName + "Application killed on recovery as it was " + + "submitted to queue " + queueName + " which is no longer a leaf queue after restart.")); return; } else{ @@ -719,6 +723,51 @@ public class CapacityScheduler extends } } + private CSQueue getOrCreateQueueFromPlacementContext(ApplicationId + applicationId, String user, String queueName, + ApplicationPlacementContext placementContext, + boolean isRecovery) { + + CSQueue queue = getQueue(queueName); + + if (queue == null) { + if (placementContext != null && placementContext.hasParentQueue()) { + try { + return autoCreateLeafQueue(placementContext); + } catch (YarnException | IOException e) { + if (isRecovery) { + if (!YarnConfiguration.shouldRMFailFast(getConfig())) { + LOG.error("Could not auto-create leaf queue " + queueName + + " due to : ", e); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery" + + " as it was submitted to queue " + queueName + + " which could not be auto-created")); + } else{ + String queueErrorMsg = + "Queue named " + queueName + " could not be " + + "auto-created during application recovery."; + LOG.fatal(queueErrorMsg, e); + throw new QueueInvalidException(queueErrorMsg); + } + } else{ + LOG.error("Could not auto-create leaf queue due to : ", e); + final String message = + "Application " + applicationId + " submission by user : " + + user + + " to queue : " + queueName + " failed : " + e + .getMessage(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } + } + } + } + return queue; + } + private void addApplication(ApplicationId applicationId, String queueName, String user, Priority priority, ApplicationPlacementContext placementContext) { @@ -732,23 +781,10 @@ public class CapacityScheduler extends message)); return; } - // Sanity checks. - CSQueue queue = getQueue(queueName); - if (queue == null && placementContext != null) { - //Could be a potential auto-created leaf queue - try { - queue = autoCreateLeafQueue(placementContext); - } catch (YarnException | IOException e) { - LOG.error("Could not auto-create leaf queue due to : ", e); - final String message = - "Application " + applicationId + " submission by user : " + user - + " to queue : " + queueName + " failed : " + e.getMessage(); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); - } - } + //Could be a potential auto-created leaf queue + CSQueue queue = getOrCreateQueueFromPlacementContext(applicationId, user, + queueName, placementContext, false); if (queue == null) { final String message = @@ -1534,7 +1570,8 @@ public class CapacityScheduler extends appAddedEvent.getPlacementContext()); } else { addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, - appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); + appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(), + appAddedEvent.getPlacementContext()); } } } @@ -2058,10 +2095,10 @@ public class CapacityScheduler extends + " (should be set and be a PlanQueue or ManagedParentQueue)"); } - AbstractManagedParentQueue parentPlan = + AbstractManagedParentQueue parent = (AbstractManagedParentQueue) newQueue.getParent(); String queuename = newQueue.getQueueName(); - parentPlan.addChildQueue(newQueue); + parent.addChildQueue(newQueue); this.queueManager.addQueue(queuename, newQueue); LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 064e2174e2f..efde7816808 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -64,6 +64,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .TestCapacitySchedulerAutoCreatedQueueBase; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueMetrics; @@ -97,6 +100,10 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER1; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp + .RMWebServices.DEFAULT_QUEUE; import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -281,6 +288,18 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase } } + private CapacitySchedulerConfiguration + getSchedulerAutoCreatedQueueConfiguration( + boolean overrideWithQueueMappings) throws IOException { + CapacitySchedulerConfiguration schedulerConf = + new CapacitySchedulerConfiguration(conf); + TestCapacitySchedulerAutoCreatedQueueBase + .setupQueueConfigurationForSingleAutoCreatedLeafQueue(schedulerConf); + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(schedulerConf, + "c", overrideWithQueueMappings, new int[] {0, 1}); + return schedulerConf; + } + // Test work preserving recovery of apps running under reservation. // This involves: // 1. Setting up a dynamic reservable queue, @@ -1532,4 +1551,141 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase Assert.assertEquals(RMAppState.FINISHED, recoveredApp.getState()); } + + @Test(timeout = 30000) + public void testDynamicAutoCreatedQueueRecoveryWithDefaultQueue() + throws Exception { + //if queue name is not specified, it should submit to 'default' queue + testDynamicAutoCreatedQueueRecovery(USER1, null); + } + + @Test(timeout = 30000) + public void testDynamicAutoCreatedQueueRecoveryWithOverrideQueueMappingFlag() + throws Exception { + testDynamicAutoCreatedQueueRecovery(USER1, USER1); + } + + // Test work preserving recovery of apps running on auto-created queues. + // This involves: + // 1. Setting up a dynamic auto-created queue, + // 2. Submitting an app to it, + // 3. Failing over RM, + // 4. Validating that the app is recovered post failover, + // 5. Check if all running containers are recovered, + // 6. Verify the scheduler state like attempt info, + // 7. Verify the queue/user metrics for the dynamic auto-created queue. + + public void testDynamicAutoCreatedQueueRecovery(String user, String queueName) + throws Exception { + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + conf.setBoolean(YarnConfiguration.RM_FAIL_FAST, true); + + // 1. Set up dynamic auto-created queue. + CapacitySchedulerConfiguration schedulerConf = null; + if (queueName == null || queueName.equals(DEFAULT_QUEUE)) { + schedulerConf = getSchedulerAutoCreatedQueueConfiguration(false); + } else{ + schedulerConf = getSchedulerAutoCreatedQueueConfiguration(true); + } + int containerMemory = 1024; + Resource containerResource = Resource.newInstance(containerMemory, 1); + + rm1 = new MockRM(schedulerConf); + rm1.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 8192, + rm1.getResourceTrackerService()); + nm1.registerNode(); + // 2. submit app to queue which is auto-created. + RMApp app1 = rm1.submitApp(200, "autoCreatedQApp", user, null, queueName); + Resource amResources = app1.getAMResourceRequests().get(0).getCapability(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // clear queue metrics + rm1.clearQueueMetrics(app1); + + // 3. Fail over (restart) RM. + rm2 = new MockRM(schedulerConf, rm1.getRMStateStore()); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // 4. Validate app is recovered post failover. + RMApp recoveredApp1 = rm2.getRMContext().getRMApps().get( + app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt(); + NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 1, ContainerState.RUNNING); + NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + NMContainerStatus completedContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + + nm1.registerNode( + Arrays.asList(amContainer, runningContainer, completedContainer), null); + + // Wait for RM to settle down on recovering containers. + waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); + Set launchedContainers = + ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId())) + .getLaunchedContainers(); + assertTrue(launchedContainers.contains(amContainer.getContainerId())); + assertTrue(launchedContainers.contains(runningContainer.getContainerId())); + + // 5. Check RMContainers are re-recreated and the container state is + // correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForContainerToComplete(loadedAttempt1, completedContainer); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + // ********* check scheduler node state.******* + // 2 running containers. + Resource usedResources = Resources.multiply(containerResource, 2); + Resource nmResource = Resource.newInstance(nm1.getMemory(), + nm1.getvCores()); + + assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue( + schedulerNode1.isValidContainer(runningContainer.getContainerId())); + assertFalse( + schedulerNode1.isValidContainer(completedContainer.getContainerId())); + // 2 launched containers, 1 completed container + assertEquals(2, schedulerNode1.getNumContainers()); + + assertEquals(Resources.subtract(nmResource, usedResources), + schedulerNode1.getUnallocatedResource()); + assertEquals(usedResources, schedulerNode1.getAllocatedResource()); + // Resource availableResources = Resources.subtract(nmResource, + // usedResources); + + // 6. Verify the scheduler state like attempt info. + Map> sa = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp = sa.get( + recoveredApp1.getApplicationId()); + + // 7. Verify the queue/user metrics for the dynamic reservable queue. + if (getSchedulerType() == SchedulerType.CAPACITY) { + checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2); + } + + // *********** check scheduler attempt state.******** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertTrue(schedulerAttempt.getLiveContainers() + .contains(scheduler.getRMContainer(amContainer.getContainerId()))); + assertTrue(schedulerAttempt.getLiveContainers() + .contains(scheduler.getRMContainer(runningContainer.getContainerId()))); + assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); + + // *********** check appSchedulingInfo state *********** + assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index d6282a17d79..035c460c833 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -100,6 +100,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { public static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; public static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; public static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + public static final String E = CapacitySchedulerConfiguration.ROOT + ".e"; public static final String A1 = A + ".a1"; public static final String A2 = A + ".a2"; public static final String B1 = B + ".b1"; @@ -129,8 +130,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { public static final String USER = "user_"; public static final String USER0 = USER + 0; public static final String USER1 = USER + 1; - public static final String USER3 = USER + 3; public static final String USER2 = USER + 2; + public static final String USER3 = USER + 3; public static final String PARENT_QUEUE = "c"; public static final Set accessibleNodeLabelsOnC = new HashSet<>(); @@ -183,7 +184,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - setupQueueMappings(conf); + setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3}); dispatcher = new SpyDispatcher(); rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler(); @@ -225,27 +226,33 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { } public static CapacitySchedulerConfiguration setupQueueMappings( - CapacitySchedulerConfiguration conf) { + CapacitySchedulerConfiguration conf, String parentQueue, boolean + overrideWithQueueMappings, int[] userIds) { List queuePlacementRules = new ArrayList<>(); queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); conf.setQueuePlacementRules(queuePlacementRules); + List existingMappings = conf + .getQueueMappings(); + //set queue mapping List queueMappings = new ArrayList<>(); - for (int i = 0; i <= 3; i++) { + for (int i = 0; i < userIds.length; i++) { //Set C as parent queue name for auto queue creation UserGroupMappingPlacementRule.QueueMapping userQueueMapping = new UserGroupMappingPlacementRule.QueueMapping( UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, - USER + i, getQueueMapping(PARENT_QUEUE, USER + i)); + USER + userIds[i], getQueueMapping(parentQueue, USER + + userIds[i])); queueMappings.add(userQueueMapping); } - conf.setQueueMappings(queueMappings); + existingMappings.addAll(queueMappings); + conf.setQueueMappings(existingMappings); //override with queue mappings - conf.setOverrideWithQueueMappings(true); + conf.setOverrideWithQueueMappings(overrideWithQueueMappings); return conf; } @@ -327,6 +334,29 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { return conf; } + public static CapacitySchedulerConfiguration + setupQueueConfigurationForSingleAutoCreatedLeafQueue( + CapacitySchedulerConfiguration conf) { + + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"c"}); + conf.setCapacity(C, 100f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + return conf; + } + @After public void tearDown() throws Exception { if (mockRM != null) { @@ -395,7 +425,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - setupQueueMappings(conf); + setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3}); RMNodeLabelsManager mgr = setupNodeLabelManager(conf); MockRM newMockRM = new MockRM(conf) {