YARN-9865. Capacity scheduler: add support for combined %user + %secondary_group mapping. Contributed by Manikandan R

This commit is contained in:
Szilard Nemeth 2019-11-11 13:27:10 +01:00
parent 516377bfa6
commit 30b93f914b
5 changed files with 159 additions and 71 deletions

View File

@ -157,6 +157,21 @@ public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
this.groups = groups;
}
private String getSecondaryGroup(String user) throws IOException {
List<String> groupsList = groups.getGroups(user);
String secondaryGroup = null;
// Traverse all secondary groups (as there could be more than one
// and position is not guaranteed) and ensure there is queue with
// the same name
for (int i = 1; i < groupsList.size(); i++) {
if (this.queueManager.getQueue(groupsList.get(i)) != null) {
secondaryGroup = groupsList.get(i);
break;
}
}
return secondaryGroup;
}
private ApplicationPlacementContext getPlacementForUser(String user)
throws IOException {
for (QueueMapping mapping : mappings) {
@ -169,22 +184,27 @@ private ApplicationPlacementContext getPlacementForUser(String user)
new QueueMapping(mapping.getType(), mapping.getSource(),
CURRENT_USER_MAPPING, groups.getGroups(user).get(0)),
user);
} else if (mapping.getParentQueue() != null
&& mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING)
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
String secondaryGroup = getSecondaryGroup(user);
if (secondaryGroup != null) {
return getPlacementContext(new QueueMapping(mapping.getType(),
mapping.getSource(), CURRENT_USER_MAPPING, secondaryGroup),
user);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("User {} is not associated with any Secondary Group. "
+ "Hence it may use the 'default' queue", user);
}
return null;
}
} else if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
return getPlacementContext(mapping, user);
} else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
return getPlacementContext(mapping, groups.getGroups(user).get(0));
} else if (mapping.queue.equals(SECONDARY_GROUP_MAPPING)) {
List<String> groupsList = groups.getGroups(user);
String secondaryGroup = null;
// Traverse all secondary groups (as there could be more than one
// and position is not guaranteed) and ensure there is queue with
// the same name
for (int i = 1; i < groupsList.size(); i++) {
if (this.queueManager.getQueue(groupsList.get(i)) != null) {
secondaryGroup = groupsList.get(i);
break;
}
}
String secondaryGroup = getSecondaryGroup(user);
if (secondaryGroup != null) {
return getPlacementContext(mapping, secondaryGroup);
} else {
@ -383,7 +403,8 @@ private static QueueMapping validateAndGetAutoCreatedQueueMapping(
CapacitySchedulerQueueManager queueManager, QueueMapping mapping,
QueuePath queuePath) throws IOException {
if (queuePath.hasParentQueue()
&& queuePath.getParentQueue().equals(PRIMARY_GROUP_MAPPING)) {
&& (queuePath.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
|| queuePath.getParentQueue().equals(SECONDARY_GROUP_MAPPING))) {
// dynamic parent queue
return new QueueMapping(mapping.getType(), mapping.getSource(),
queuePath.getLeafQueue(), queuePath.getParentQueue());

View File

@ -114,6 +114,10 @@ public void testMapping() throws YarnException {
verifyQueueMapping(
new QueueMapping(MappingType.USER, "%user", "%user", "%primary_group"),
"a", YarnConfiguration.DEFAULT_QUEUE_NAME, "a", false, "agroup");
verifyQueueMapping(
new QueueMapping(MappingType.USER, "%user", "%user",
"%secondary_group"),
"a", YarnConfiguration.DEFAULT_QUEUE_NAME, "a", false, "asubgroup2");
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
"a", "q1");

View File

@ -329,7 +329,7 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration(
// Define top-level queues
// Set childQueue for root
conf.setQueues(ROOT,
new String[] { "a", "b", "c", "d" });
new String[] { "a", "b", "c", "d", "asubgroup1", "asubgroup2" });
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);

View File

@ -132,61 +132,6 @@ public void testUpdatePlacementRulesFactory() throws Exception {
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
}
@Test
public void testNestedUserQueueWithDynamicParentQueue() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
List<String> queuePlacementRules = new ArrayList<>();
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
conf.setQueuePlacementRules(queuePlacementRules);
List<UserGroupMappingPlacementRule.QueueMapping> existingMappingsForUG =
conf.getQueueMappings();
// set queue mapping
List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
new ArrayList<>();
// u:%user:%primary_group.%user
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
"%user", getQueueMapping("%primary_group", "%user"));
queueMappingsForUG.add(userQueueMapping);
existingMappingsForUG.addAll(queueMappingsForUG);
conf.setQueueMappings(existingMappingsForUG);
// override with queue mappings
conf.setOverrideWithQueueMappings(true);
mockRM = new MockRM(conf);
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
cs.updatePlacementRules();
mockRM.start();
cs.start();
ApplicationSubmissionContext asc =
Records.newRecord(ApplicationSubmissionContext.class);
asc.setQueue("default");
String inputUser = "a";
List<PlacementRule> rules =
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
UserGroupMappingPlacementRule r =
(UserGroupMappingPlacementRule) rules.get(0);
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, inputUser);
assertEquals("Queue", "a", ctx.getQueue());
assertEquals("Group", "agroup", ctx.getParentQueue());
}
@Test
public void testNestedUserQueueWithStaticParentQueue() throws Exception {
@ -250,4 +195,120 @@ public void testNestedUserQueueWithStaticParentQueue() throws Exception {
assertEquals("Queue", "user2", ctx2.getQueue());
assertEquals("Queue", "c", ctx2.getParentQueue());
}
}
@Test
public void testNestedUserQueueWithPrimaryGroupAsDynamicParentQueue()
throws Exception {
/**
* Mapping order: 1. u:%user:%primary_group.%user 2.
* u:%user:%secondary_group.%user
*
* Expected parent queue is primary group of the user
*/
// set queue mapping
List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
new ArrayList<>();
// u:%user:%primary_group.%user
UserGroupMappingPlacementRule.QueueMapping userQueueMapping1 =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
"%user", getQueueMapping("%primary_group", "%user"));
// u:%user:%secondary_group.%user
UserGroupMappingPlacementRule.QueueMapping userQueueMapping2 =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
"%user", getQueueMapping("%secondary_group", "%user"));
queueMappingsForUG.add(userQueueMapping1);
queueMappingsForUG.add(userQueueMapping2);
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true);
}
@Test
public void testNestedUserQueueWithSecondaryGroupAsDynamicParentQueue()
throws Exception {
/**
* Mapping order: 1. u:%user:%secondary_group.%user 2.
* u:%user:%primary_group.%user
*
* Expected parent queue is secondary group of the user
*/
// set queue mapping
List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
new ArrayList<>();
// u:%user:%primary_group.%user
UserGroupMappingPlacementRule.QueueMapping userQueueMapping1 =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
"%user", getQueueMapping("%primary_group", "%user"));
// u:%user:%secondary_group.%user
UserGroupMappingPlacementRule.QueueMapping userQueueMapping2 =
new UserGroupMappingPlacementRule.QueueMapping(
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
"%user", getQueueMapping("%secondary_group", "%user"));
queueMappingsForUG.add(userQueueMapping2);
queueMappingsForUG.add(userQueueMapping1);
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, false);
}
private void testNestedUserQueueWithDynamicParentQueue(
List<UserGroupMappingPlacementRule.QueueMapping> mapping,
boolean primary)
throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
List<String> queuePlacementRules = new ArrayList<>();
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
conf.setQueuePlacementRules(queuePlacementRules);
List<UserGroupMappingPlacementRule.QueueMapping> existingMappingsForUG =
conf.getQueueMappings();
existingMappingsForUG.addAll(mapping);
conf.setQueueMappings(existingMappingsForUG);
// override with queue mappings
conf.setOverrideWithQueueMappings(true);
mockRM = new MockRM(conf);
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
cs.updatePlacementRules();
mockRM.start();
cs.start();
ApplicationSubmissionContext asc =
Records.newRecord(ApplicationSubmissionContext.class);
asc.setQueue("default");
List<PlacementRule> rules =
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
UserGroupMappingPlacementRule r =
(UserGroupMappingPlacementRule) rules.get(0);
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "a");
assertEquals("Queue", "a", ctx.getQueue());
if (primary) {
assertEquals("Primary Group", "agroup", ctx.getParentQueue());
} else {
assertEquals("Secondary Group", "asubgroup1", ctx.getParentQueue());
}
mockRM.close();
}
}

View File

@ -170,13 +170,15 @@ Example:
```
<property>
<name>yarn.scheduler.capacity.queue-mappings</name>
<value>u:user1:queue1,g:group1:queue2,u:%user:%user,u:user2:%primary_group,u:user3:%secondary_group,u:%user:%primary_group.%user</value>
<value>u:user1:queue1,g:group1:queue2,u:%user:%user,u:user2:%primary_group,u:user3:%secondary_group,u:%user:%primary_group.%user,u:%user:%secondary_group.%user</value>
<description>
Here, <user1> is mapped to <queue1>, <group1> is mapped to <queue2>,
maps users to queues with the same name as user, <user2> is mapped
to queue name same as <primary group>, maps users to queue with the
same name as user but parent queue name should be same as <primary group>
of the user respectively. The mappings will be evaluated from left to
of the user, maps users to queue with the same name as user but parent
queue name should be same as any <secondary group> of the user
respectively. The mappings will be evaluated from left to
right, and the first valid mapping will be used.
</description>
</property>