YARN-9841. Capacity scheduler: add support for combined %user + %primary_group mapping. Contributed by Manikandan R
This commit is contained in:
parent
54dc6b7d72
commit
f0699a7406
|
@ -162,7 +162,14 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
for (QueueMapping mapping : mappings) {
|
for (QueueMapping mapping : mappings) {
|
||||||
if (mapping.type == MappingType.USER) {
|
if (mapping.type == MappingType.USER) {
|
||||||
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
|
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
|
||||||
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
|
if (mapping.getParentQueue() != null
|
||||||
|
&& mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
|
||||||
|
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
||||||
|
return getPlacementContext(
|
||||||
|
new QueueMapping(mapping.getType(), mapping.getSource(),
|
||||||
|
CURRENT_USER_MAPPING, groups.getGroups(user).get(0)),
|
||||||
|
user);
|
||||||
|
} else if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
|
||||||
return getPlacementContext(mapping, user);
|
return getPlacementContext(mapping, user);
|
||||||
} else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
|
} else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
|
||||||
return getPlacementContext(mapping, groups.getGroups(user).get(0));
|
return getPlacementContext(mapping, groups.getGroups(user).get(0));
|
||||||
|
@ -375,7 +382,12 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
private static QueueMapping validateAndGetAutoCreatedQueueMapping(
|
private static QueueMapping validateAndGetAutoCreatedQueueMapping(
|
||||||
CapacitySchedulerQueueManager queueManager, QueueMapping mapping,
|
CapacitySchedulerQueueManager queueManager, QueueMapping mapping,
|
||||||
QueuePath queuePath) throws IOException {
|
QueuePath queuePath) throws IOException {
|
||||||
if (queuePath.hasParentQueue()) {
|
if (queuePath.hasParentQueue()
|
||||||
|
&& queuePath.getParentQueue().equals(PRIMARY_GROUP_MAPPING)) {
|
||||||
|
// dynamic parent queue
|
||||||
|
return new QueueMapping(mapping.getType(), mapping.getSource(),
|
||||||
|
queuePath.getLeafQueue(), queuePath.getParentQueue());
|
||||||
|
} else if (queuePath.hasParentQueue()) {
|
||||||
//if parent queue is specified,
|
//if parent queue is specified,
|
||||||
// then it should exist and be an instance of ManagedParentQueue
|
// then it should exist and be an instance of ManagedParentQueue
|
||||||
validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()),
|
validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()),
|
||||||
|
|
|
@ -57,6 +57,13 @@ public class TestUserGroupMappingPlacementRule {
|
||||||
|
|
||||||
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
|
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
|
||||||
String inputQueue, String expectedQueue, boolean overwrite) throws YarnException {
|
String inputQueue, String expectedQueue, boolean overwrite) throws YarnException {
|
||||||
|
verifyQueueMapping(queueMapping, inputUser, inputQueue, expectedQueue,
|
||||||
|
overwrite, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyQueueMapping(QueueMapping queueMapping, String inputUser,
|
||||||
|
String inputQueue, String expectedQueue, boolean overwrite,
|
||||||
|
String expectedParentQueue) throws YarnException {
|
||||||
Groups groups = new Groups(conf);
|
Groups groups = new Groups(conf);
|
||||||
UserGroupMappingPlacementRule rule = new UserGroupMappingPlacementRule(
|
UserGroupMappingPlacementRule rule = new UserGroupMappingPlacementRule(
|
||||||
overwrite, Arrays.asList(queueMapping), groups);
|
overwrite, Arrays.asList(queueMapping), groups);
|
||||||
|
@ -68,8 +75,12 @@ public class TestUserGroupMappingPlacementRule {
|
||||||
ApplicationSubmissionContext.class);
|
ApplicationSubmissionContext.class);
|
||||||
asc.setQueue(inputQueue);
|
asc.setQueue(inputQueue);
|
||||||
ApplicationPlacementContext ctx = rule.getPlacementForApp(asc, inputUser);
|
ApplicationPlacementContext ctx = rule.getPlacementForApp(asc, inputUser);
|
||||||
Assert.assertEquals(expectedQueue,
|
Assert.assertEquals("Queue", expectedQueue,
|
||||||
ctx != null ? ctx.getQueue() : inputQueue);
|
ctx != null ? ctx.getQueue() : inputQueue);
|
||||||
|
if (expectedParentQueue != null) {
|
||||||
|
Assert.assertEquals("Parent Queue", expectedParentQueue,
|
||||||
|
ctx.getParentQueue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -97,35 +108,39 @@ public class TestUserGroupMappingPlacementRule {
|
||||||
"q2");
|
"q2");
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"),
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"),
|
||||||
"a", "a");
|
"a", "a");
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.USER, "%user",
|
verifyQueueMapping(
|
||||||
"%primary_group"), "a", "agroup");
|
new QueueMapping(MappingType.USER, "%user", "%primary_group"), "a",
|
||||||
|
"agroup");
|
||||||
|
verifyQueueMapping(
|
||||||
|
new QueueMapping(MappingType.USER, "%user", "%user", "%primary_group"),
|
||||||
|
"a", YarnConfiguration.DEFAULT_QUEUE_NAME, "a", false, "agroup");
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
|
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
|
||||||
"a", "q1");
|
"a", "q1");
|
||||||
|
|
||||||
// specify overwritten, and see if user specified a queue, and it will be
|
// specify overwritten, and see if user specified a queue, and it will be
|
||||||
// overridden
|
// overridden
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), "user",
|
||||||
"user", "q2", "q1", true);
|
"q2", "q1", true);
|
||||||
|
|
||||||
// if overwritten not specified, it should be which user specified
|
// if overwritten not specified, it should be which user specified
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
|
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), "user",
|
||||||
"user", "q2", "q2", false);
|
"q2", "q2", false);
|
||||||
|
|
||||||
// if overwritten not specified, it should be which user specified
|
// if overwritten not specified, it should be which user specified
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "usergroup",
|
verifyQueueMapping(
|
||||||
"%user", "usergroup"),
|
new QueueMapping(MappingType.GROUP, "usergroup", "%user", "usergroup"),
|
||||||
"user", "default", "user", false);
|
"user", "default", "user", false);
|
||||||
|
|
||||||
// if overwritten not specified, it should be which user specified
|
// if overwritten not specified, it should be which user specified
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "usergroup",
|
verifyQueueMapping(
|
||||||
"%user", "usergroup"),
|
new QueueMapping(MappingType.GROUP, "usergroup", "%user", "usergroup"),
|
||||||
"user", "agroup", "user", true);
|
"user", "agroup", "user", true);
|
||||||
|
|
||||||
//If user specific queue is enabled for a specified group under a given
|
//If user specific queue is enabled for a specified group under a given
|
||||||
// parent queue
|
// parent queue
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup",
|
verifyQueueMapping(
|
||||||
"%user", "parent1"),
|
new QueueMapping(MappingType.GROUP, "agroup", "%user", "parent1"), "a",
|
||||||
"a", "a");
|
"a");
|
||||||
|
|
||||||
//If user specific queue is enabled for a specified group without parent
|
//If user specific queue is enabled for a specified group without parent
|
||||||
// queue
|
// queue
|
||||||
|
|
|
@ -18,12 +18,18 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -32,7 +38,7 @@ import java.util.List;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.getQueueMapping;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.getQueueMapping;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.setupQueueConfiguration;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.setupQueueConfiguration;
|
||||||
import static org.hamcrest.CoreMatchers.hasItems;
|
import static org.hamcrest.CoreMatchers.hasItems;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class TestCapacitySchedulerQueueMappingFactory {
|
public class TestCapacitySchedulerQueueMappingFactory {
|
||||||
|
|
||||||
|
@ -125,4 +131,123 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
||||||
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP));
|
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP));
|
||||||
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
|
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNestedUserQueueWithDynamicParentQueue() throws Exception {
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||||
|
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||||
|
|
||||||
|
List<String> queuePlacementRules = new ArrayList<>();
|
||||||
|
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||||
|
conf.setQueuePlacementRules(queuePlacementRules);
|
||||||
|
|
||||||
|
List<UserGroupMappingPlacementRule.QueueMapping> existingMappingsForUG =
|
||||||
|
conf.getQueueMappings();
|
||||||
|
|
||||||
|
// set queue mapping
|
||||||
|
List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
|
||||||
|
new ArrayList<>();
|
||||||
|
|
||||||
|
// u:%user:%primary_group.%user
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
|
||||||
|
new UserGroupMappingPlacementRule.QueueMapping(
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
||||||
|
"%user", getQueueMapping("%primary_group", "%user"));
|
||||||
|
queueMappingsForUG.add(userQueueMapping);
|
||||||
|
|
||||||
|
existingMappingsForUG.addAll(queueMappingsForUG);
|
||||||
|
conf.setQueueMappings(existingMappingsForUG);
|
||||||
|
|
||||||
|
// override with queue mappings
|
||||||
|
conf.setOverrideWithQueueMappings(true);
|
||||||
|
|
||||||
|
mockRM = new MockRM(conf);
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||||
|
cs.updatePlacementRules();
|
||||||
|
mockRM.start();
|
||||||
|
cs.start();
|
||||||
|
|
||||||
|
ApplicationSubmissionContext asc =
|
||||||
|
Records.newRecord(ApplicationSubmissionContext.class);
|
||||||
|
asc.setQueue("default");
|
||||||
|
String inputUser = "a";
|
||||||
|
|
||||||
|
List<PlacementRule> rules =
|
||||||
|
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
||||||
|
|
||||||
|
UserGroupMappingPlacementRule r =
|
||||||
|
(UserGroupMappingPlacementRule) rules.get(0);
|
||||||
|
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, inputUser);
|
||||||
|
assertEquals("Queue", "a", ctx.getQueue());
|
||||||
|
assertEquals("Group", "agroup", ctx.getParentQueue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNestedUserQueueWithStaticParentQueue() throws Exception {
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||||
|
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||||
|
|
||||||
|
List<String> queuePlacementRules = new ArrayList<>();
|
||||||
|
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||||
|
conf.setQueuePlacementRules(queuePlacementRules);
|
||||||
|
|
||||||
|
List<UserGroupMappingPlacementRule.QueueMapping> existingMappingsForUG =
|
||||||
|
conf.getQueueMappings();
|
||||||
|
|
||||||
|
// set queue mapping
|
||||||
|
List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
|
||||||
|
new ArrayList<>();
|
||||||
|
|
||||||
|
// u:user1:b1
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping userQueueMapping1 =
|
||||||
|
new UserGroupMappingPlacementRule.QueueMapping(
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, "user1",
|
||||||
|
"b1");
|
||||||
|
// u:%user:parentqueue.%user
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping userQueueMapping2 =
|
||||||
|
new UserGroupMappingPlacementRule.QueueMapping(
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, "%user",
|
||||||
|
getQueueMapping("c", "%user"));
|
||||||
|
queueMappingsForUG.add(userQueueMapping1);
|
||||||
|
queueMappingsForUG.add(userQueueMapping2);
|
||||||
|
|
||||||
|
existingMappingsForUG.addAll(queueMappingsForUG);
|
||||||
|
conf.setQueueMappings(existingMappingsForUG);
|
||||||
|
|
||||||
|
// override with queue mappings
|
||||||
|
conf.setOverrideWithQueueMappings(true);
|
||||||
|
|
||||||
|
mockRM = new MockRM(conf);
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||||
|
cs.updatePlacementRules();
|
||||||
|
mockRM.start();
|
||||||
|
cs.start();
|
||||||
|
|
||||||
|
ApplicationSubmissionContext asc =
|
||||||
|
Records.newRecord(ApplicationSubmissionContext.class);
|
||||||
|
asc.setQueue("default");
|
||||||
|
|
||||||
|
List<PlacementRule> rules =
|
||||||
|
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
||||||
|
|
||||||
|
UserGroupMappingPlacementRule r =
|
||||||
|
(UserGroupMappingPlacementRule) rules.get(0);
|
||||||
|
|
||||||
|
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
||||||
|
assertEquals("Queue", "b1", ctx.getQueue());
|
||||||
|
|
||||||
|
ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "user2");
|
||||||
|
assertEquals("Queue", "user2", ctx2.getQueue());
|
||||||
|
assertEquals("Queue", "c", ctx2.getParentQueue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,12 +33,9 @@ public class SimpleGroupsMapping implements GroupMappingServiceProvider {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cacheGroupsRefresh() throws IOException {
|
public void cacheGroupsRefresh() throws IOException {
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cacheGroupsAdd(List<String> groups) throws IOException {
|
public void cacheGroupsAdd(List<String> groups) throws IOException {
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,12 +170,14 @@ Example:
|
||||||
```
|
```
|
||||||
<property>
|
<property>
|
||||||
<name>yarn.scheduler.capacity.queue-mappings</name>
|
<name>yarn.scheduler.capacity.queue-mappings</name>
|
||||||
<value>u:user1:queue1,g:group1:queue2,u:%user:%user,u:user2:%primary_group,u:user3:%secondary_group</value>
|
<value>u:user1:queue1,g:group1:queue2,u:%user:%user,u:user2:%primary_group,u:user3:%secondary_group,u:%user:%primary_group.%user</value>
|
||||||
<description>
|
<description>
|
||||||
Here, <user1> is mapped to <queue1>, <group1> is mapped to <queue2>,
|
Here, <user1> is mapped to <queue1>, <group1> is mapped to <queue2>,
|
||||||
maps users to queues with the same name as user, <user2> is mapped
|
maps users to queues with the same name as user, <user2> is mapped
|
||||||
to queue name same as <primary group> respectively. The mappings will be
|
to queue name same as <primary group>, maps users to queue with the
|
||||||
evaluated from left to right, and the first valid mapping will be used.
|
same name as user but parent queue name should be same as <primary group>
|
||||||
|
of the user respectively. The mappings will be evaluated from left to
|
||||||
|
right, and the first valid mapping will be used.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue