YARN-2669. FairScheduler: queue names shouldn't allow periods (Wei Yan via Sandy Ryza)
This commit is contained in:
parent
1e9a3f42ff
commit
a128cca305
|
@ -89,6 +89,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-2679. Add metric for container launch duration. (Zhihai Xu via kasha)
|
YARN-2679. Add metric for container launch duration. (Zhihai Xu via kasha)
|
||||||
|
|
||||||
|
YARN-2669. FairScheduler: queue names shouldn't allow periods
|
||||||
|
(Wei Yan via Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
|
|
@ -396,6 +396,12 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
Map<FSQueueType, Set<String>> configuredQueues)
|
Map<FSQueueType, Set<String>> configuredQueues)
|
||||||
throws AllocationConfigurationException {
|
throws AllocationConfigurationException {
|
||||||
String queueName = element.getAttribute("name");
|
String queueName = element.getAttribute("name");
|
||||||
|
|
||||||
|
if (queueName.contains(".")) {
|
||||||
|
throw new AllocationConfigurationException("Bad fair scheduler config "
|
||||||
|
+ "file: queue name (" + queueName + ") shouldn't contain period.");
|
||||||
|
}
|
||||||
|
|
||||||
if (parentName != null) {
|
if (parentName != null) {
|
||||||
queueName = parentName + "." + queueName;
|
queueName = parentName + "." + queueName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -585,6 +585,17 @@ public class FairScheduler extends
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (queueName.startsWith(".") || queueName.endsWith(".")) {
|
||||||
|
String message = "Reject application " + applicationId
|
||||||
|
+ " submitted by user " + user + " with an illegal queue name "
|
||||||
|
+ queueName + ". "
|
||||||
|
+ "The queue name cannot start/end with period.";
|
||||||
|
LOG.info(message);
|
||||||
|
rmContext.getDispatcher().getEventHandler()
|
||||||
|
.handle(new RMAppRejectedEvent(applicationId, message));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
||||||
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
|
|
|
@ -23,6 +23,8 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.security.Groups;
|
import org.apache.hadoop.security.Groups;
|
||||||
|
@ -38,7 +40,9 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class QueuePlacementRule {
|
public abstract class QueuePlacementRule {
|
||||||
protected boolean create;
|
protected boolean create;
|
||||||
|
public static final Log LOG =
|
||||||
|
LogFactory.getLog(QueuePlacementRule.class.getName());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the rule with any arguments.
|
* Initializes the rule with any arguments.
|
||||||
*
|
*
|
||||||
|
@ -125,7 +129,7 @@ public abstract class QueuePlacementRule {
|
||||||
@Override
|
@Override
|
||||||
protected String getQueueForApp(String requestedQueue, String user,
|
protected String getQueueForApp(String requestedQueue, String user,
|
||||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
|
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
|
||||||
return "root." + user;
|
return "root." + cleanName(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -142,7 +146,7 @@ public abstract class QueuePlacementRule {
|
||||||
protected String getQueueForApp(String requestedQueue, String user,
|
protected String getQueueForApp(String requestedQueue, String user,
|
||||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
|
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return "root." + groups.getGroups(user).get(0);
|
return "root." + cleanName(groups.getGroups(user).get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -164,11 +168,11 @@ public abstract class QueuePlacementRule {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<String> groupNames = groups.getGroups(user);
|
List<String> groupNames = groups.getGroups(user);
|
||||||
for (int i = 1; i < groupNames.size(); i++) {
|
for (int i = 1; i < groupNames.size(); i++) {
|
||||||
String group = groupNames.get(i);
|
String group = cleanName(groupNames.get(i));
|
||||||
if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
|
if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
|
||||||
|| configuredQueues.get(FSQueueType.PARENT).contains(
|
|| configuredQueues.get(FSQueueType.PARENT).contains(
|
||||||
"root." + group)) {
|
"root." + group)) {
|
||||||
return "root." + groupNames.get(i);
|
return "root." + group;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,7 +245,7 @@ public abstract class QueuePlacementRule {
|
||||||
if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
|
if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
return queueName + "." + user;
|
return queueName + "." + cleanName(user);
|
||||||
}
|
}
|
||||||
return queueName;
|
return queueName;
|
||||||
}
|
}
|
||||||
|
@ -339,4 +343,18 @@ public abstract class QueuePlacementRule {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replace the periods in the username or groupname with "_dot_".
|
||||||
|
*/
|
||||||
|
protected String cleanName(String name) {
|
||||||
|
if (name.contains(".")) {
|
||||||
|
String converted = name.replaceAll("\\.", "_dot_");
|
||||||
|
LOG.warn("Name " + name + " is converted to " + converted
|
||||||
|
+ " when it is used as a queue name.");
|
||||||
|
return converted;
|
||||||
|
} else {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class PeriodGroupsMapping implements GroupMappingServiceProvider {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getGroups(String user) {
|
||||||
|
return Arrays.asList(user + ".group", user + "subgroup1", user + "subgroup2");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cacheGroupsRefresh() throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cacheGroupsAdd(List<String> groups) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -525,6 +525,30 @@ public class TestAllocationFileLoaderService {
|
||||||
allocLoader.setReloadListener(confHolder);
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that you can't include periods as the queue name in the allocations
|
||||||
|
* file.
|
||||||
|
*/
|
||||||
|
@Test (expected = AllocationConfigurationException.class)
|
||||||
|
public void testQueueNameContainingPeriods() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"parent1.child1\">");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
|
allocLoader.reloadAllocations();
|
||||||
|
}
|
||||||
|
|
||||||
private class ReloadListener implements AllocationFileLoaderService.Listener {
|
private class ReloadListener implements AllocationFileLoaderService.Listener {
|
||||||
public AllocationConfiguration allocConf;
|
public AllocationConfiguration allocConf;
|
||||||
|
|
|
@ -916,6 +916,46 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
|
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueuNameWithPeriods() throws Exception {
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// only default queue
|
||||||
|
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
|
||||||
|
// submit app with queue name (.A)
|
||||||
|
ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1);
|
||||||
|
AppAddedSchedulerEvent appAddedEvent1 =
|
||||||
|
new AppAddedSchedulerEvent(appAttemptId1.getApplicationId(), ".A", "user1");
|
||||||
|
scheduler.handle(appAddedEvent1);
|
||||||
|
// submission rejected
|
||||||
|
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
assertNull(scheduler.getSchedulerApp(appAttemptId1));
|
||||||
|
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
|
||||||
|
|
||||||
|
// submit app with queue name (A.)
|
||||||
|
ApplicationAttemptId appAttemptId2 = createAppAttemptId(2, 1);
|
||||||
|
AppAddedSchedulerEvent appAddedEvent2 =
|
||||||
|
new AppAddedSchedulerEvent(appAttemptId2.getApplicationId(), "A.", "user1");
|
||||||
|
scheduler.handle(appAddedEvent2);
|
||||||
|
// submission rejected
|
||||||
|
assertEquals(1, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
assertNull(scheduler.getSchedulerApp(appAttemptId2));
|
||||||
|
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
|
||||||
|
|
||||||
|
// submit app with queue name (A.B)
|
||||||
|
ApplicationAttemptId appAttemptId3 = createAppAttemptId(3, 1);
|
||||||
|
AppAddedSchedulerEvent appAddedEvent3 =
|
||||||
|
new AppAddedSchedulerEvent(appAttemptId3.getApplicationId(), "A.B", "user1");
|
||||||
|
scheduler.handle(appAddedEvent3);
|
||||||
|
// submission accepted
|
||||||
|
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
|
||||||
|
assertNull(scheduler.getSchedulerApp(appAttemptId3));
|
||||||
|
assertEquals(0, resourceManager.getRMContext().getRMApps().size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAssignToQueue() throws Exception {
|
public void testAssignToQueue() throws Exception {
|
||||||
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
||||||
|
|
|
@ -345,7 +345,54 @@ public class TestQueuePlacementPolicy {
|
||||||
assertEquals("root.parentq.user1",
|
assertEquals("root.parentq.user1",
|
||||||
policy.assignAppToQueue("root.default", "user1"));
|
policy.assignAppToQueue("root.default", "user1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUserContainsPeriod() throws Exception {
|
||||||
|
// This test covers the user case where the username contains periods.
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
sb.append("<queuePlacementPolicy>");
|
||||||
|
sb.append(" <rule name='user' />");
|
||||||
|
sb.append("</queuePlacementPolicy>");
|
||||||
|
QueuePlacementPolicy policy = parse(sb.toString());
|
||||||
|
assertEquals("root.first_dot_last",
|
||||||
|
policy.assignAppToQueue("default", "first.last"));
|
||||||
|
|
||||||
|
sb = new StringBuffer();
|
||||||
|
sb.append("<queuePlacementPolicy>");
|
||||||
|
sb.append(" <rule name='specified' create='false' />");
|
||||||
|
sb.append(" <rule name='nestedUserQueue'>");
|
||||||
|
sb.append(" <rule name='default'/>");
|
||||||
|
sb.append(" </rule>");
|
||||||
|
sb.append(" <rule name='default' />");
|
||||||
|
sb.append("</queuePlacementPolicy>");
|
||||||
|
policy = parse(sb.toString());
|
||||||
|
assertEquals("root.default.first_dot_last",
|
||||||
|
policy.assignAppToQueue("root.default", "first.last"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupContainsPeriod() throws Exception {
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
sb.append("<queuePlacementPolicy>");
|
||||||
|
sb.append(" <rule name='specified' create='false' />");
|
||||||
|
sb.append(" <rule name='nestedUserQueue'>");
|
||||||
|
sb.append(" <rule name='primaryGroup'/>");
|
||||||
|
sb.append(" </rule>");
|
||||||
|
sb.append(" <rule name='default' />");
|
||||||
|
sb.append("</queuePlacementPolicy>");
|
||||||
|
|
||||||
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||||
|
PeriodGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||||
|
// User queue would be created under primary group queue, and the period
|
||||||
|
// in the group name should be converted into _dot_
|
||||||
|
QueuePlacementPolicy policy = parse(sb.toString());
|
||||||
|
assertEquals("root.user1_dot_group.user1",
|
||||||
|
policy.assignAppToQueue("root.default", "user1"));
|
||||||
|
|
||||||
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||||
|
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||||
|
}
|
||||||
|
|
||||||
private QueuePlacementPolicy parse(String str) throws Exception {
|
private QueuePlacementPolicy parse(String str) throws Exception {
|
||||||
// Read and parse the allocations file.
|
// Read and parse the allocations file.
|
||||||
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
|
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
|
||||||
|
|
|
@ -321,17 +321,25 @@ Allocation file format
|
||||||
continue. Valid rules are:
|
continue. Valid rules are:
|
||||||
|
|
||||||
* specified: the app is placed into the queue it requested. If the app
|
* specified: the app is placed into the queue it requested. If the app
|
||||||
requested no queue, i.e. it specified "default", we continue.
|
requested no queue, i.e. it specified "default", we continue. If the app
|
||||||
|
requested a queue name starting or ending with period, i.e. names like
|
||||||
|
".q1" or "q1." will be rejected.
|
||||||
|
|
||||||
* user: the app is placed into a queue with the name of the user who
|
* user: the app is placed into a queue with the name of the user who
|
||||||
submitted it.
|
submitted it. Periods in the username will be replace with "_dot_",
|
||||||
|
i.e. the queue name for user "first.last" is "first_dot_last".
|
||||||
|
|
||||||
* primaryGroup: the app is placed into a queue with the name of the
|
* primaryGroup: the app is placed into a queue with the name of the
|
||||||
primary group of the user who submitted it.
|
primary group of the user who submitted it. Periods in the group name
|
||||||
|
will be replaced with "_dot_", i.e. the queue name for group "one.two"
|
||||||
|
is "one_dot_two".
|
||||||
|
|
||||||
* secondaryGroupExistingQueue: the app is placed into a queue with a name
|
* secondaryGroupExistingQueue: the app is placed into a queue with a name
|
||||||
that matches a secondary group of the user who submitted it. The first
|
that matches a secondary group of the user who submitted it. The first
|
||||||
secondary group that matches a configured queue will be selected.
|
secondary group that matches a configured queue will be selected.
|
||||||
|
Periods in group names will be replaced with "_dot_", i.e. a user with
|
||||||
|
"one.two" as one of their secondary groups would be placed into the
|
||||||
|
"one_dot_two" queue, if such a queue exists.
|
||||||
|
|
||||||
* nestedUserQueue : the app is placed into a queue with the name of the user
|
* nestedUserQueue : the app is placed into a queue with the name of the user
|
||||||
under the queue suggested by the nested rule. This is similar to ‘user’
|
under the queue suggested by the nested rule. This is similar to ‘user’
|
||||||
|
|
Loading…
Reference in New Issue