diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index aa217edaec0..0bb4a8ccd68 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -47,6 +47,9 @@ Release 2.6.0 - UNRELEASED
YARN-2378. Added support for moving applications across queues in
CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe)
+ YARN-2411. Support simple user and group mappings to queues. (Ram Venkatesh
+ via jianhe)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
index 2bed464bcc6..30f4eb96c6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
@@ -108,4 +108,27 @@
+
+ yarn.scheduler.capacity.queue-mappings
+
+
+ A list of mappings that will be used to assign jobs to queues
+ The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
+ Typically this list will be used to map users to queues,
+ for example, u:%user:%user maps all users to queues with the same name
+ as the user.
+
+
+
+
+ yarn.scheduler.capacity.queue-mappings-override.enable
+ false
+
+ If a queue mapping is present, will it override the value specified
+ by the user? This can be used by administrators to place jobs in queues
+ that are different than the one specified by the user.
+ The default is false.
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1b31a1a6bbb..e979c5c2320 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import com.google.common.base.Preconditions;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -41,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -59,10 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -77,6 +73,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -94,6 +92,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
@LimitedPrivate("yarn")
@Evolving
@@ -199,6 +198,16 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
+ private boolean overrideWithQueueMappings = false;
+ private List mappings = new ArrayList();
+ private Groups groups;
+
+ @VisibleForTesting
+ public synchronized String getMappedQueueForTest(String user)
+ throws IOException {
+ return getMappedQueue(user);
+ }
+
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
}
@@ -263,7 +272,6 @@ public class CapacityScheduler extends
this.applications =
new ConcurrentHashMap>();
-
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -402,7 +410,32 @@ public class CapacityScheduler extends
}
}
private static final QueueHook noop = new QueueHook();
-
+
+ private void initializeQueueMappings() throws IOException {
+ overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+ LOG.info("Initialized queue mappings, override: "
+ + overrideWithQueueMappings);
+ // Get new user/group mappings
+ List newMappings = conf.getQueueMappings();
+ //check if mappings refer to valid queues
+ for (QueueMapping mapping : newMappings) {
+ if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
+ !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ CSQueue queue = queues.get(mapping.queue);
+ if (queue == null || !(queue instanceof LeafQueue)) {
+ throw new IOException(
+ "mapping contains invalid or non-leaf queue " + mapping.queue);
+ }
+ }
+ }
+ //apply the new mappings since they are valid
+ mappings = newMappings;
+ // initialize groups if mappings are present
+ if (mappings.size() > 0) {
+ groups = new Groups(conf);
+ }
+ }
+
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
@@ -410,7 +443,9 @@ public class CapacityScheduler extends
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
+
LOG.info("Initialized root queue " + root);
+ initializeQueueMappings();
}
@Lock(CapacityScheduler.class)
@@ -430,6 +465,7 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
+ initializeQueueMappings();
}
/**
@@ -517,12 +553,73 @@ public class CapacityScheduler extends
}
synchronized CSQueue getQueue(String queueName) {
+ if (queueName == null) {
+ return null;
+ }
return queues.get(queueName);
}
+ private static final String CURRENT_USER_MAPPING = "%user";
+
+ private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
+
+ private String getMappedQueue(String user) throws IOException {
+ for (QueueMapping mapping : mappings) {
+ if (mapping.type == MappingType.USER) {
+ if (mapping.source.equals(CURRENT_USER_MAPPING)) {
+ if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
+ return user;
+ }
+ else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
+ return groups.getGroups(user).get(0);
+ }
+ else {
+ return mapping.queue;
+ }
+ }
+ if (user.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ if (mapping.type == MappingType.GROUP) {
+ for (String userGroups : groups.getGroups(user)) {
+ if (userGroups.equals(mapping.source)) {
+ return mapping.queue;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
private synchronized void addApplication(ApplicationId applicationId,
- String queueName, String user, boolean isAppRecovering) {
- // santiy checks.
+ String queueName, String user, boolean isAppRecovering) {
+
+ if (mappings != null && mappings.size() > 0) {
+ try {
+ String mappedQueue = getMappedQueue(user);
+ if (mappedQueue != null) {
+ // We have a mapping, should we use it?
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
+ || overrideWithQueueMappings) {
+ LOG.info("Application " + applicationId + " user " + user
+ + " mapping [" + queueName + "] to [" + mappedQueue
+ + "] override " + overrideWithQueueMappings);
+ queueName = mappedQueue;
+ RMApp rmApp = rmContext.getRMApps().get(applicationId);
+ rmApp.setQueue(queueName);
+ }
+ }
+ } catch (IOException ioex) {
+ String message = "Failed to submit application " + applicationId +
+ " submitted by user " + user + " reason: " + ioex.getMessage();
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppRejectedEvent(applicationId, message));
+ return;
+ }
+ }
+
+ // sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId +
@@ -902,8 +999,8 @@ public class CapacityScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser(),
- appAddedEvent.getIsAppRecovering());
+ appAddedEvent.getQueue(),
+ appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 6fe695ecda2..af6bdc301ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -18,8 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -145,6 +144,44 @@ public class CapacitySchedulerConfiguration extends Configuration {
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
+ @Private
+ public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
+
+ @Private
+ public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
+
+ @Private
+ public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
+
+ @Private
+ public static class QueueMapping {
+
+ public enum MappingType {
+
+ USER("u"),
+ GROUP("g");
+ private final String type;
+ private MappingType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+
+ };
+
+ MappingType type;
+ String source;
+ String queue;
+
+ public QueueMapping(MappingType type, String source, String queue) {
+ this.type = type;
+ this.source = source;
+ this.queue = queue;
+ }
+ }
public CapacitySchedulerConfiguration() {
this(new Configuration());
@@ -378,4 +415,82 @@ public class CapacitySchedulerConfiguration extends Configuration {
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
}
+ public boolean getOverrideWithQueueMappings() {
+ return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
+ DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
+ }
+
+ /**
+ * Returns a collection of strings, trimming leading and trailing whitespeace
+ * on each value
+ *
+ * @param str
+ * String to parse
+ * @param delim
+ * delimiter to separate the values
+ * @return Collection of parsed elements.
+ */
+ private static Collection getTrimmedStringCollection(String str,
+ String delim) {
+ List values = new ArrayList();
+ if (str == null)
+ return values;
+ StringTokenizer tokenizer = new StringTokenizer(str, delim);
+ while (tokenizer.hasMoreTokens()) {
+ String next = tokenizer.nextToken();
+ if (next == null || next.trim().isEmpty()) {
+ continue;
+ }
+ values.add(next.trim());
+ }
+ return values;
+ }
+
+ /**
+ * Get user/group mappings to queues.
+ *
+ * @return user/groups mappings or null on illegal configs
+ */
+ public List getQueueMappings() {
+ List mappings =
+ new ArrayList();
+ Collection mappingsString =
+ getTrimmedStringCollection(QUEUE_MAPPING);
+ for (String mappingValue : mappingsString) {
+ String[] mapping =
+ getTrimmedStringCollection(mappingValue, ":")
+ .toArray(new String[] {});
+ if (mapping.length != 3 || mapping[1].length() == 0
+ || mapping[2].length() == 0) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ QueueMapping m;
+ try {
+ QueueMapping.MappingType mappingType;
+ if (mapping[0].equals("u")) {
+ mappingType = QueueMapping.MappingType.USER;
+ } else if (mapping[0].equals("g")) {
+ mappingType = QueueMapping.MappingType.GROUP;
+ } else {
+ throw new IllegalArgumentException(
+ "unknown mapping prefix " + mapping[0]);
+ }
+ m = new QueueMapping(
+ mappingType,
+ mapping[1],
+ mapping[2]);
+ } catch (Throwable t) {
+ throw new IllegalArgumentException(
+ "Illegal queue mapping " + mappingValue);
+ }
+
+ if (m != null) {
+ mappings.add(m);
+ }
+ }
+
+ return mappings;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
new file mode 100644
index 00000000000..f573f43f067
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestQueueMappings {
+
+ private static final Log LOG = LogFactory.getLog(TestQueueMappings.class);
+
+ private static final String Q1 = "q1";
+ private static final String Q2 = "q2";
+
+ private final static String Q1_PATH =
+ CapacitySchedulerConfiguration.ROOT + "." + Q1;
+ private final static String Q2_PATH =
+ CapacitySchedulerConfiguration.ROOT + "." + Q2;
+
+ private MockRM resourceManager;
+
+ @After
+ public void tearDown() throws Exception {
+ if (resourceManager != null) {
+ LOG.info("Stopping the resource manager");
+ resourceManager.stop();
+ }
+ }
+
+ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { Q1, Q2 });
+
+ conf.setCapacity(Q1_PATH, 10);
+ conf.setCapacity(Q2_PATH, 90);
+
+ LOG.info("Setup top-level queues q1 and q2");
+ }
+
+ @Test (timeout = 60000)
+ public void testQueueMapping() throws Exception {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ CapacityScheduler cs = new CapacityScheduler();
+
+ RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+ null, new RMContainerTokenSecretManager(conf),
+ new NMTokenSecretManagerInRM(conf),
+ new ClientToAMTokenSecretManagerInRM(), null);
+ cs.setConf(conf);
+ cs.setRMContext(rmContext);
+ cs.init(conf);
+ cs.start();
+
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
+ "true");
+
+ // configuration parsing tests - negative test cases
+ checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
+ checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
+ checkInvalidQMapping(conf, cs, "g:a", "no queue specified");
+ checkInvalidQMapping(conf, cs, "u:a:b,g:a",
+ "multiple mappings with invalid mapping");
+ checkInvalidQMapping(conf, cs, "u:a:b,g:a:d:e", "too many path segments");
+ checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
+ checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
+ checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
+
+ // simple base case for mapping user to queue
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
+ cs.reinitialize(conf, null);
+ checkQMapping("a", Q1, cs);
+
+ // group mapping test
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
+ cs.reinitialize(conf, null);
+ checkQMapping("a", Q1, cs);
+
+ // %user tests
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
+ cs.reinitialize(conf, null);
+ checkQMapping("a", Q2, cs);
+
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
+ cs.reinitialize(conf, null);
+ checkQMapping("a", "a", cs);
+
+ // %primary_group tests
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+ "u:%user:%primary_group");
+ cs.reinitialize(conf, null);
+ checkQMapping("a", "agroup", cs);
+
+ // non-primary group mapping
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+ "g:asubgroup1:" + Q1);
+ cs.reinitialize(conf, null);
+ checkQMapping("a", Q1, cs);
+
+ // space trimming
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
+ cs.reinitialize(conf, null);
+ checkQMapping("a", Q1, cs);
+
+ csConf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+ conf = new YarnConfiguration(csConf);
+
+ resourceManager = new MockRM(csConf);
+ resourceManager.start();
+
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
+ "true");
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
+ resourceManager.getResourceScheduler().reinitialize(conf, null);
+
+ // ensure that if the user specifies a Q that is still overriden
+ checkAppQueue(resourceManager, "user", Q2, Q1);
+
+ // toggle admin override and retry
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
+ false);
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
+ setupQueueConfiguration(csConf);
+ resourceManager.getResourceScheduler().reinitialize(conf, null);
+
+ checkAppQueue(resourceManager, "user", Q2, Q2);
+
+ // ensure that if a user does not specify a Q, the user mapping is used
+ checkAppQueue(resourceManager, "user", null, Q1);
+
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
+ setupQueueConfiguration(csConf);
+ resourceManager.getResourceScheduler().reinitialize(conf, null);
+
+ // ensure that if a user does not specify a Q, the group mapping is used
+ checkAppQueue(resourceManager, "user", null, Q2);
+
+ // if the mapping specifies a queue that does not exist, the job is rejected
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
+ "u:user:non_existent_queue");
+ setupQueueConfiguration(csConf);
+
+ boolean fail = false;
+ try {
+ resourceManager.getResourceScheduler().reinitialize(conf, null);
+ }
+ catch (IOException ioex) {
+ fail = true;
+ }
+ Assert.assertTrue("queue initialization failed for non-existent q", fail);
+ resourceManager.stop();
+ }
+
+ private void checkAppQueue(MockRM resourceManager, String user,
+ String submissionQueue, String expected)
+ throws Exception {
+ RMApp app = resourceManager.submitApp(200, "name", user,
+ new HashMap(), false, submissionQueue, -1,
+ null, "MAPREDUCE", false);
+ RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
+ : RMAppState.ACCEPTED;
+ resourceManager.waitForState(app.getApplicationId(), expectedState);
+ // get scheduler app
+ CapacityScheduler cs = (CapacityScheduler)
+ resourceManager.getResourceScheduler();
+ SchedulerApplication schedulerApp =
+ cs.getSchedulerApplications().get(app.getApplicationId());
+ String queue = "";
+ if (schedulerApp != null) {
+ queue = schedulerApp.getQueue().getQueueName();
+ }
+ Assert.assertTrue("expected " + expected + " actual " + queue,
+ expected.equals(queue));
+ Assert.assertEquals(expected, app.getQueue());
+ }
+
+ private void checkInvalidQMapping(YarnConfiguration conf,
+ CapacityScheduler cs,
+ String mapping, String reason)
+ throws IOException {
+ boolean fail = false;
+ try {
+ conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, mapping);
+ cs.reinitialize(conf, null);
+ } catch (IOException ex) {
+ fail = true;
+ }
+ Assert.assertTrue("invalid mapping did not throw exception for " + reason,
+ fail);
+ }
+
+ private void checkQMapping(String user, String expected, CapacityScheduler cs)
+ throws IOException {
+ String actual = cs.getMappedQueueForTest(user);
+ Assert.assertTrue("expected " + expected + " actual " + actual,
+ expected.equals(actual));
+ }
+}