diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4fe0c648770..cb6b64485d9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -29,6 +29,9 @@ Release 2.6.0 - UNRELEASED YARN-2378. Added support for moving applications across queues in CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe) + YARN-2411. Support simple user and group mappings to queues. (Ram Venkatesh + via jianhe) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 2bed464bcc6..30f4eb96c6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -108,4 +108,27 @@ + + yarn.scheduler.capacity.queue-mappings + + + A list of mappings that will be used to assign jobs to queues + The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]* + Typically this list will be used to map users to queues, + for example, u:%user:%user maps all users to queues with the same name + as the user. + + + + + yarn.scheduler.capacity.queue-mappings-override.enable + false + + If a queue mapping is present, will it override the value specified + by the user? This can be used by administrators to place jobs in queues + that are different than the one specified by the user. + The default is false. + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1b31a1a6bbb..e979c5c2320 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -41,6 +39,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -59,10 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -77,6 +73,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -94,6 +92,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; @LimitedPrivate("yarn") @Evolving @@ -199,6 +198,16 @@ public Configuration getConf() { + ".scheduling-interval-ms"; private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; + private boolean overrideWithQueueMappings = false; + private List mappings = new ArrayList(); + private Groups groups; + + @VisibleForTesting + public synchronized String getMappedQueueForTest(String user) + throws IOException { + return getMappedQueue(user); + } + public CapacityScheduler() { super(CapacityScheduler.class.getName()); } @@ -263,7 +272,6 @@ private synchronized void initScheduler(Configuration configuration) throws this.applications = new ConcurrentHashMap>(); - initializeQueues(this.conf); scheduleAsynchronously = this.conf.getScheduleAynschronously(); @@ -402,7 +410,32 @@ public CSQueue hook(CSQueue queue) { } } private static final QueueHook noop = new QueueHook(); - + + private void initializeQueueMappings() throws IOException { + overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info("Initialized queue mappings, override: " + + overrideWithQueueMappings); + // Get new user/group mappings + List newMappings = conf.getQueueMappings(); + //check if mappings refer to valid queues + for (QueueMapping mapping : newMappings) { + if (!mapping.queue.equals(CURRENT_USER_MAPPING) && + !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + CSQueue queue = queues.get(mapping.queue); + if (queue == null || !(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping.queue); + } + } + } + //apply the new mappings since they are valid + mappings = newMappings; + // initialize groups if mappings are present + if (mappings.size() > 0) { + groups = new Groups(conf); + } + } + @Lock(CapacityScheduler.class) private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { @@ -410,7 +443,9 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); + LOG.info("Initialized root queue " + root); + initializeQueueMappings(); } @Lock(CapacityScheduler.class) @@ -430,6 +465,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) // Re-configure queues root.reinitialize(newRoot, clusterResource); + initializeQueueMappings(); } /** @@ -517,12 +553,73 @@ static CSQueue parseQueue( } synchronized CSQueue getQueue(String queueName) { + if (queueName == null) { + return null; + } return queues.get(queueName); } + private static final String CURRENT_USER_MAPPING = "%user"; + + private static final String PRIMARY_GROUP_MAPPING = "%primary_group"; + + private String getMappedQueue(String user) throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.type == MappingType.USER) { + if (mapping.source.equals(CURRENT_USER_MAPPING)) { + if (mapping.queue.equals(CURRENT_USER_MAPPING)) { + return user; + } + else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + return groups.getGroups(user).get(0); + } + else { + return mapping.queue; + } + } + if (user.equals(mapping.source)) { + return mapping.queue; + } + } + if (mapping.type == MappingType.GROUP) { + for (String userGroups : groups.getGroups(user)) { + if (userGroups.equals(mapping.source)) { + return mapping.queue; + } + } + } + } + return null; + } + private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { - // santiy checks. + String queueName, String user, boolean isAppRecovering) { + + if (mappings != null && mappings.size() > 0) { + try { + String mappedQueue = getMappedQueue(user); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + || overrideWithQueueMappings) { + LOG.info("Application " + applicationId + " user " + user + + " mapping [" + queueName + "] to [" + mappedQueue + + "] override " + overrideWithQueueMappings); + queueName = mappedQueue; + RMApp rmApp = rmContext.getRMApps().get(applicationId); + rmApp.setQueue(queueName); + } + } + } catch (IOException ioex) { + String message = "Failed to submit application " + applicationId + + " submitted by user " + user + " reason: " + ioex.getMessage(); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } + } + + // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { String message = "Application " + applicationId + @@ -902,8 +999,8 @@ public void handle(SchedulerEvent event) { { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getQueue(), + appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6fe695ecda2..af6bdc301ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -18,8 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,6 +144,44 @@ public class CapacitySchedulerConfiguration extends Configuration { @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; + + @Private + public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; + + @Private + public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable"; + + @Private + public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; + + @Private + public static class QueueMapping { + + public enum MappingType { + + USER("u"), + GROUP("g"); + private final String type; + private MappingType(String type) { + this.type = type; + } + + public String toString() { + return type; + } + + }; + + MappingType type; + String source; + String queue; + + public QueueMapping(MappingType type, String source, String queue) { + this.type = type; + this.source = source; + this.queue = queue; + } + } public CapacitySchedulerConfiguration() { this(new Configuration()); @@ -378,4 +415,82 @@ public void setScheduleAynschronously(boolean async) { setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async); } + public boolean getOverrideWithQueueMappings() { + return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, + DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); + } + + /** + * Returns a collection of strings, trimming leading and trailing whitespeace + * on each value + * + * @param str + * String to parse + * @param delim + * delimiter to separate the values + * @return Collection of parsed elements. + */ + private static Collection getTrimmedStringCollection(String str, + String delim) { + List values = new ArrayList(); + if (str == null) + return values; + StringTokenizer tokenizer = new StringTokenizer(str, delim); + while (tokenizer.hasMoreTokens()) { + String next = tokenizer.nextToken(); + if (next == null || next.trim().isEmpty()) { + continue; + } + values.add(next.trim()); + } + return values; + } + + /** + * Get user/group mappings to queues. + * + * @return user/groups mappings or null on illegal configs + */ + public List getQueueMappings() { + List mappings = + new ArrayList(); + Collection mappingsString = + getTrimmedStringCollection(QUEUE_MAPPING); + for (String mappingValue : mappingsString) { + String[] mapping = + getTrimmedStringCollection(mappingValue, ":") + .toArray(new String[] {}); + if (mapping.length != 3 || mapping[1].length() == 0 + || mapping[2].length() == 0) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + QueueMapping m; + try { + QueueMapping.MappingType mappingType; + if (mapping[0].equals("u")) { + mappingType = QueueMapping.MappingType.USER; + } else if (mapping[0].equals("g")) { + mappingType = QueueMapping.MappingType.GROUP; + } else { + throw new IllegalArgumentException( + "unknown mapping prefix " + mapping[0]); + } + m = new QueueMapping( + mappingType, + mapping[1], + mapping[2]); + } catch (Throwable t) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + if (m != null) { + mappings.add(m); + } + } + + return mappings; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java new file mode 100644 index 00000000000..f573f43f067 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestQueueMappings { + + private static final Log LOG = LogFactory.getLog(TestQueueMappings.class); + + private static final String Q1 = "q1"; + private static final String Q2 = "q2"; + + private final static String Q1_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q1; + private final static String Q2_PATH = + CapacitySchedulerConfiguration.ROOT + "." + Q2; + + private MockRM resourceManager; + + @After + public void tearDown() throws Exception { + if (resourceManager != null) { + LOG.info("Stopping the resource manager"); + resourceManager.stop(); + } + } + + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { Q1, Q2 }); + + conf.setCapacity(Q1_PATH, 10); + conf.setCapacity(Q2_PATH, 90); + + LOG.info("Setup top-level queues q1 and q2"); + } + + @Test (timeout = 60000) + public void testQueueMapping() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + YarnConfiguration conf = new YarnConfiguration(csConf); + CapacityScheduler cs = new CapacityScheduler(); + + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + cs.setConf(conf); + cs.setRMContext(rmContext); + cs.init(conf); + cs.start(); + + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, + "true"); + + // configuration parsing tests - negative test cases + checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier"); + checkInvalidQMapping(conf, cs, "u:a", "no queue specified"); + checkInvalidQMapping(conf, cs, "g:a", "no queue specified"); + checkInvalidQMapping(conf, cs, "u:a:b,g:a", + "multiple mappings with invalid mapping"); + checkInvalidQMapping(conf, cs, "u:a:b,g:a:d:e", "too many path segments"); + checkInvalidQMapping(conf, cs, "u::", "empty source and queue"); + checkInvalidQMapping(conf, cs, "u:", "missing source missing queue"); + checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q"); + + // simple base case for mapping user to queue + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1); + cs.reinitialize(conf, null); + checkQMapping("a", Q1, cs); + + // group mapping test + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1); + cs.reinitialize(conf, null); + checkQMapping("a", Q1, cs); + + // %user tests + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2); + cs.reinitialize(conf, null); + checkQMapping("a", Q2, cs); + + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user"); + cs.reinitialize(conf, null); + checkQMapping("a", "a", cs); + + // %primary_group tests + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "u:%user:%primary_group"); + cs.reinitialize(conf, null); + checkQMapping("a", "agroup", cs); + + // non-primary group mapping + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "g:asubgroup1:" + Q1); + cs.reinitialize(conf, null); + checkQMapping("a", Q1, cs); + + // space trimming + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1); + cs.reinitialize(conf, null); + checkQMapping("a", Q1, cs); + + csConf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + conf = new YarnConfiguration(csConf); + + resourceManager = new MockRM(csConf); + resourceManager.start(); + + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, + "true"); + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1); + resourceManager.getResourceScheduler().reinitialize(conf, null); + + // ensure that if the user specifies a Q that is still overriden + checkAppQueue(resourceManager, "user", Q2, Q1); + + // toggle admin override and retry + conf.setBoolean( + CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE, + false); + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1); + setupQueueConfiguration(csConf); + resourceManager.getResourceScheduler().reinitialize(conf, null); + + checkAppQueue(resourceManager, "user", Q2, Q2); + + // ensure that if a user does not specify a Q, the user mapping is used + checkAppQueue(resourceManager, "user", null, Q1); + + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2); + setupQueueConfiguration(csConf); + resourceManager.getResourceScheduler().reinitialize(conf, null); + + // ensure that if a user does not specify a Q, the group mapping is used + checkAppQueue(resourceManager, "user", null, Q2); + + // if the mapping specifies a queue that does not exist, the job is rejected + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "u:user:non_existent_queue"); + setupQueueConfiguration(csConf); + + boolean fail = false; + try { + resourceManager.getResourceScheduler().reinitialize(conf, null); + } + catch (IOException ioex) { + fail = true; + } + Assert.assertTrue("queue initialization failed for non-existent q", fail); + resourceManager.stop(); + } + + private void checkAppQueue(MockRM resourceManager, String user, + String submissionQueue, String expected) + throws Exception { + RMApp app = resourceManager.submitApp(200, "name", user, + new HashMap(), false, submissionQueue, -1, + null, "MAPREDUCE", false); + RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED + : RMAppState.ACCEPTED; + resourceManager.waitForState(app.getApplicationId(), expectedState); + // get scheduler app + CapacityScheduler cs = (CapacityScheduler) + resourceManager.getResourceScheduler(); + SchedulerApplication schedulerApp = + cs.getSchedulerApplications().get(app.getApplicationId()); + String queue = ""; + if (schedulerApp != null) { + queue = schedulerApp.getQueue().getQueueName(); + } + Assert.assertTrue("expected " + expected + " actual " + queue, + expected.equals(queue)); + Assert.assertEquals(expected, app.getQueue()); + } + + private void checkInvalidQMapping(YarnConfiguration conf, + CapacityScheduler cs, + String mapping, String reason) + throws IOException { + boolean fail = false; + try { + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, mapping); + cs.reinitialize(conf, null); + } catch (IOException ex) { + fail = true; + } + Assert.assertTrue("invalid mapping did not throw exception for " + reason, + fail); + } + + private void checkQMapping(String user, String expected, CapacityScheduler cs) + throws IOException { + String actual = cs.getMappedQueueForTest(user); + Assert.assertTrue("expected " + expected + " actual " + actual, + expected.equals(actual)); + } +}