YARN-2411. Support simple user and group mappings to queues. Contributed by Ram Venkatesh

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-08-18 06:08:45 +00:00
parent 86840834ed
commit 519c4be95a
5 changed files with 492 additions and 14 deletions

View File

@ -47,6 +47,9 @@ Release 2.6.0 - UNRELEASED
YARN-2378. Added support for moving applications across queues in
CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe)
YARN-2411. Support simple user and group mappings to queues. (Ram Venkatesh
via jianhe)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -108,4 +108,27 @@
</description>
</property>
<property>
<name>yarn.scheduler.capacity.queue-mappings</name>
<value></value>
<description>
A list of mappings that will be used to assign jobs to queues
The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
Typically this list will be used to map users to queues,
for example, u:%user:%user maps all users to queues with the same name
as the user.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
<value>false</value>
<description>
If a queue mapping is present, will it override the value specified
by the user? This can be used by administrators to place jobs in queues
that are different than the one specified by the user.
The default is false.
</description>
</property>
</configuration>

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@ -41,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -59,10 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -77,6 +73,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -94,6 +92,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@LimitedPrivate("yarn")
@Evolving
@ -199,6 +198,16 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private boolean overrideWithQueueMappings = false;
private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
private Groups groups;
@VisibleForTesting
public synchronized String getMappedQueueForTest(String user)
throws IOException {
return getMappedQueue(user);
}
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
}
@ -263,7 +272,6 @@ public class CapacityScheduler extends
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@ -402,7 +410,32 @@ public class CapacityScheduler extends
}
}
private static final QueueHook noop = new QueueHook();
private void initializeQueueMappings() throws IOException {
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info("Initialized queue mappings, override: "
+ overrideWithQueueMappings);
// Get new user/group mappings
List<QueueMapping> newMappings = conf.getQueueMappings();
//check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
!mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
CSQueue queue = queues.get(mapping.queue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping.queue);
}
}
}
//apply the new mappings since they are valid
mappings = newMappings;
// initialize groups if mappings are present
if (mappings.size() > 0) {
groups = new Groups(conf);
}
}
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
@ -410,7 +443,9 @@ public class CapacityScheduler extends
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
}
@Lock(CapacityScheduler.class)
@ -430,6 +465,7 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
}
/**
@ -517,12 +553,73 @@ public class CapacityScheduler extends
}
synchronized CSQueue getQueue(String queueName) {
if (queueName == null) {
return null;
}
return queues.get(queueName);
}
private static final String CURRENT_USER_MAPPING = "%user";
private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
private String getMappedQueue(String user) throws IOException {
for (QueueMapping mapping : mappings) {
if (mapping.type == MappingType.USER) {
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
return user;
}
else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
return groups.getGroups(user).get(0);
}
else {
return mapping.queue;
}
}
if (user.equals(mapping.source)) {
return mapping.queue;
}
}
if (mapping.type == MappingType.GROUP) {
for (String userGroups : groups.getGroups(user)) {
if (userGroups.equals(mapping.source)) {
return mapping.queue;
}
}
}
}
return null;
}
private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) {
// santiy checks.
String queueName, String user, boolean isAppRecovering) {
if (mappings != null && mappings.size() > 0) {
try {
String mappedQueue = getMappedQueue(user);
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId + " user " + user
+ " mapping [" + queueName + "] to [" + mappedQueue
+ "] override " + overrideWithQueueMappings);
queueName = mappedQueue;
RMApp rmApp = rmContext.getRMApps().get(applicationId);
rmApp.setQueue(queueName);
}
}
} catch (IOException ioex) {
String message = "Failed to submit application " + applicationId +
" submitted by user " + user + " reason: " + ioex.getMessage();
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message));
return;
}
}
// sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId +
@ -902,8 +999,8 @@ public class CapacityScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
appAddedEvent.getQueue(),
appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -145,6 +144,44 @@ public class CapacitySchedulerConfiguration extends Configuration {
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
@Private
public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
@Private
public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
@Private
public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
@Private
public static class QueueMapping {
public enum MappingType {
USER("u"),
GROUP("g");
private final String type;
private MappingType(String type) {
this.type = type;
}
public String toString() {
return type;
}
};
MappingType type;
String source;
String queue;
public QueueMapping(MappingType type, String source, String queue) {
this.type = type;
this.source = source;
this.queue = queue;
}
}
public CapacitySchedulerConfiguration() {
this(new Configuration());
@ -378,4 +415,82 @@ public class CapacitySchedulerConfiguration extends Configuration {
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
}
public boolean getOverrideWithQueueMappings() {
return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
}
/**
* Returns a collection of strings, trimming leading and trailing whitespeace
* on each value
*
* @param str
* String to parse
* @param delim
* delimiter to separate the values
* @return Collection of parsed elements.
*/
private static Collection<String> getTrimmedStringCollection(String str,
String delim) {
List<String> values = new ArrayList<String>();
if (str == null)
return values;
StringTokenizer tokenizer = new StringTokenizer(str, delim);
while (tokenizer.hasMoreTokens()) {
String next = tokenizer.nextToken();
if (next == null || next.trim().isEmpty()) {
continue;
}
values.add(next.trim());
}
return values;
}
/**
* Get user/group mappings to queues.
*
* @return user/groups mappings or null on illegal configs
*/
public List<QueueMapping> getQueueMappings() {
List<QueueMapping> mappings =
new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
Collection<String> mappingsString =
getTrimmedStringCollection(QUEUE_MAPPING);
for (String mappingValue : mappingsString) {
String[] mapping =
getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 3 || mapping[1].length() == 0
|| mapping[2].length() == 0) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
QueueMapping m;
try {
QueueMapping.MappingType mappingType;
if (mapping[0].equals("u")) {
mappingType = QueueMapping.MappingType.USER;
} else if (mapping[0].equals("g")) {
mappingType = QueueMapping.MappingType.GROUP;
} else {
throw new IllegalArgumentException(
"unknown mapping prefix " + mapping[0]);
}
m = new QueueMapping(
mappingType,
mapping[1],
mapping[2]);
} catch (Throwable t) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
if (m != null) {
mappings.add(m);
}
}
return mappings;
}
}

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class TestQueueMappings {
private static final Log LOG = LogFactory.getLog(TestQueueMappings.class);
private static final String Q1 = "q1";
private static final String Q2 = "q2";
private final static String Q1_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q2;
private MockRM resourceManager;
@After
public void tearDown() throws Exception {
if (resourceManager != null) {
LOG.info("Stopping the resource manager");
resourceManager.stop();
}
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { Q1, Q2 });
conf.setCapacity(Q1_PATH, 10);
conf.setCapacity(Q2_PATH, 90);
LOG.info("Setup top-level queues q1 and q2");
}
@Test (timeout = 60000)
public void testQueueMapping() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
cs.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
// configuration parsing tests - negative test cases
checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
checkInvalidQMapping(conf, cs, "g:a", "no queue specified");
checkInvalidQMapping(conf, cs, "u:a:b,g:a",
"multiple mappings with invalid mapping");
checkInvalidQMapping(conf, cs, "u:a:b,g:a:d:e", "too many path segments");
checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
// simple base case for mapping user to queue
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// group mapping test
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// %user tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
cs.reinitialize(conf, null);
checkQMapping("a", Q2, cs);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
cs.reinitialize(conf, null);
checkQMapping("a", "a", cs);
// %primary_group tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:%user:%primary_group");
cs.reinitialize(conf, null);
checkQMapping("a", "agroup", cs);
// non-primary group mapping
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"g:asubgroup1:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// space trimming
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
csConf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
conf = new YarnConfiguration(csConf);
resourceManager = new MockRM(csConf);
resourceManager.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if the user specifies a Q that is still overriden
checkAppQueue(resourceManager, "user", Q2, Q1);
// toggle admin override and retry
conf.setBoolean(
CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
false);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
checkAppQueue(resourceManager, "user", Q2, Q2);
// ensure that if a user does not specify a Q, the user mapping is used
checkAppQueue(resourceManager, "user", null, Q1);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if a user does not specify a Q, the group mapping is used
checkAppQueue(resourceManager, "user", null, Q2);
// if the mapping specifies a queue that does not exist, the job is rejected
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:user:non_existent_queue");
setupQueueConfiguration(csConf);
boolean fail = false;
try {
resourceManager.getResourceScheduler().reinitialize(conf, null);
}
catch (IOException ioex) {
fail = true;
}
Assert.assertTrue("queue initialization failed for non-existent q", fail);
resourceManager.stop();
}
private void checkAppQueue(MockRM resourceManager, String user,
String submissionQueue, String expected)
throws Exception {
RMApp app = resourceManager.submitApp(200, "name", user,
new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
null, "MAPREDUCE", false);
RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
: RMAppState.ACCEPTED;
resourceManager.waitForState(app.getApplicationId(), expectedState);
// get scheduler app
CapacityScheduler cs = (CapacityScheduler)
resourceManager.getResourceScheduler();
SchedulerApplication schedulerApp =
cs.getSchedulerApplications().get(app.getApplicationId());
String queue = "";
if (schedulerApp != null) {
queue = schedulerApp.getQueue().getQueueName();
}
Assert.assertTrue("expected " + expected + " actual " + queue,
expected.equals(queue));
Assert.assertEquals(expected, app.getQueue());
}
private void checkInvalidQMapping(YarnConfiguration conf,
CapacityScheduler cs,
String mapping, String reason)
throws IOException {
boolean fail = false;
try {
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, mapping);
cs.reinitialize(conf, null);
} catch (IOException ex) {
fail = true;
}
Assert.assertTrue("invalid mapping did not throw exception for " + reason,
fail);
}
private void checkQMapping(String user, String expected, CapacityScheduler cs)
throws IOException {
String actual = cs.getMappedQueueForTest(user);
Assert.assertTrue("expected " + expected + " actual " + actual,
expected.equals(actual));
}
}