YARN-9912. Capacity scheduler: support u:user2:%secondary_group queue mapping. Contributed by Manikandan R
This commit is contained in:
parent
b7ef8a333f
commit
621c5eac38
|
@ -168,6 +168,11 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (secondaryGroup == null && LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("User {} is not associated with any Secondary "
|
||||||
|
+ "Group. Hence it may use the 'default' queue", user);
|
||||||
|
}
|
||||||
return secondaryGroup;
|
return secondaryGroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,10 +203,6 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
validateQueueMapping(queueMapping);
|
validateQueueMapping(queueMapping);
|
||||||
return getPlacementContext(queueMapping, user);
|
return getPlacementContext(queueMapping, user);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("User {} is not associated with any Secondary Group. "
|
|
||||||
+ "Hence it may use the 'default' queue", user);
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
} else if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
|
} else if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
|
||||||
|
@ -219,10 +220,6 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
if (secondaryGroup != null) {
|
if (secondaryGroup != null) {
|
||||||
return getPlacementContext(mapping, secondaryGroup);
|
return getPlacementContext(mapping, secondaryGroup);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("User {} is not associated with any Secondary "
|
|
||||||
+ "Group. Hence it may use the 'default' queue", user);
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -232,6 +229,13 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
if (user.equals(mapping.source)) {
|
if (user.equals(mapping.source)) {
|
||||||
if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
|
if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
|
||||||
return getPlacementContext(mapping, groups.getGroups(user).get(0));
|
return getPlacementContext(mapping, groups.getGroups(user).get(0));
|
||||||
|
} else if (mapping.queue.equals(SECONDARY_GROUP_MAPPING)) {
|
||||||
|
String secondaryGroup = getSecondaryGroup(user);
|
||||||
|
if (secondaryGroup != null) {
|
||||||
|
return getPlacementContext(mapping, secondaryGroup);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return getPlacementContext(mapping);
|
return getPlacementContext(mapping);
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,6 +122,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
public static final String B1 = B + ".b1";
|
public static final String B1 = B + ".b1";
|
||||||
public static final String B2 = B + ".b2";
|
public static final String B2 = B + ".b2";
|
||||||
public static final String B3 = B + ".b3";
|
public static final String B3 = B + ".b3";
|
||||||
|
public static final String B4 = B + ".b4subgroup1";
|
||||||
public static final String ASUBGROUP1_A = ASUBGROUP1 + ".e";
|
public static final String ASUBGROUP1_A = ASUBGROUP1 + ".e";
|
||||||
public static final String AGROUP_A = AGROUP + ".f";
|
public static final String AGROUP_A = AGROUP + ".f";
|
||||||
public static final float A_CAPACITY = 20f;
|
public static final float A_CAPACITY = 20f;
|
||||||
|
@ -135,8 +136,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
public static final float A2_CAPACITY = 70;
|
public static final float A2_CAPACITY = 70;
|
||||||
public static final float B1_CAPACITY = 60f;
|
public static final float B1_CAPACITY = 60f;
|
||||||
public static final float B2_CAPACITY = 20f;
|
public static final float B2_CAPACITY = 20f;
|
||||||
public static final float B3_CAPACITY = 20f;
|
public static final float B3_CAPACITY = 10f;
|
||||||
|
public static final float B4_CAPACITY = 10f;
|
||||||
public static final int NODE_MEMORY = 16;
|
public static final int NODE_MEMORY = 16;
|
||||||
|
|
||||||
public static final int NODE1_VCORES = 16;
|
public static final int NODE1_VCORES = 16;
|
||||||
|
@ -356,13 +357,15 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
conf.setCapacity(A2, A2_CAPACITY);
|
conf.setCapacity(A2, A2_CAPACITY);
|
||||||
conf.setUserLimitFactor(A2, 100.0f);
|
conf.setUserLimitFactor(A2, 100.0f);
|
||||||
|
|
||||||
conf.setQueues(B, new String[] { "b1", "b2", "b3" });
|
conf.setQueues(B, new String[] { "b1", "b2", "b3", "b4subgroup1" });
|
||||||
conf.setCapacity(B1, B1_CAPACITY);
|
conf.setCapacity(B1, B1_CAPACITY);
|
||||||
conf.setUserLimitFactor(B1, 100.0f);
|
conf.setUserLimitFactor(B1, 100.0f);
|
||||||
conf.setCapacity(B2, B2_CAPACITY);
|
conf.setCapacity(B2, B2_CAPACITY);
|
||||||
conf.setUserLimitFactor(B2, 100.0f);
|
conf.setUserLimitFactor(B2, 100.0f);
|
||||||
conf.setCapacity(B3, B3_CAPACITY);
|
conf.setCapacity(B3, B3_CAPACITY);
|
||||||
conf.setUserLimitFactor(B3, 100.0f);
|
conf.setUserLimitFactor(B3, 100.0f);
|
||||||
|
conf.setCapacity(B4, B4_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(B4, 100.0f);
|
||||||
|
|
||||||
conf.setQueues(ASUBGROUP1, new String[] {"e"});
|
conf.setQueues(ASUBGROUP1, new String[] {"e"});
|
||||||
conf.setCapacity(ASUBGROUP1_A, 100f);
|
conf.setCapacity(ASUBGROUP1_A, 100f);
|
||||||
|
|
|
@ -236,8 +236,14 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
||||||
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
||||||
"%user", getQueueMapping("%secondary_group", "%user"));
|
"%user", getQueueMapping("%secondary_group", "%user"));
|
||||||
|
|
||||||
|
// u:b4:%secondary_group
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping userQueueMapping3 =
|
||||||
|
new UserGroupMappingPlacementRule.QueueMapping(
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
||||||
|
"b4", "%secondary_group");
|
||||||
queueMappingsForUG.add(userQueueMapping1);
|
queueMappingsForUG.add(userQueueMapping1);
|
||||||
queueMappingsForUG.add(userQueueMapping2);
|
queueMappingsForUG.add(userQueueMapping2);
|
||||||
|
queueMappingsForUG.add(userQueueMapping3);
|
||||||
|
|
||||||
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "f");
|
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "f");
|
||||||
|
|
||||||
|
@ -416,4 +422,83 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFixedUserWithDynamicGroupQueue() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||||
|
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||||
|
|
||||||
|
List<String> queuePlacementRules = new ArrayList<>();
|
||||||
|
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||||
|
conf.setQueuePlacementRules(queuePlacementRules);
|
||||||
|
|
||||||
|
List<UserGroupMappingPlacementRule.QueueMapping> existingMappingsForUG =
|
||||||
|
conf.getQueueMappings();
|
||||||
|
|
||||||
|
// set queue mapping
|
||||||
|
List<UserGroupMappingPlacementRule.QueueMapping> queueMappingsForUG =
|
||||||
|
new ArrayList<>();
|
||||||
|
|
||||||
|
// u:user1:b1
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping userQueueMapping1 =
|
||||||
|
new UserGroupMappingPlacementRule.QueueMapping(
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
||||||
|
"user1", "b1");
|
||||||
|
|
||||||
|
// u:user2:%primary_group
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping userQueueMapping2 =
|
||||||
|
new UserGroupMappingPlacementRule.QueueMapping(
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER,
|
||||||
|
"user2", "%primary_group");
|
||||||
|
|
||||||
|
// u:b4:%secondary_group
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping userQueueMapping3 =
|
||||||
|
new UserGroupMappingPlacementRule.QueueMapping(
|
||||||
|
UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, "b4",
|
||||||
|
"%secondary_group");
|
||||||
|
|
||||||
|
queueMappingsForUG.add(userQueueMapping1);
|
||||||
|
queueMappingsForUG.add(userQueueMapping2);
|
||||||
|
queueMappingsForUG.add(userQueueMapping3);
|
||||||
|
existingMappingsForUG.addAll(queueMappingsForUG);
|
||||||
|
conf.setQueueMappings(existingMappingsForUG);
|
||||||
|
|
||||||
|
//override with queue mappings
|
||||||
|
conf.setOverrideWithQueueMappings(true);
|
||||||
|
|
||||||
|
MockRM mockRM = null;
|
||||||
|
try {
|
||||||
|
mockRM = new MockRM(conf);
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||||
|
cs.updatePlacementRules();
|
||||||
|
mockRM.start();
|
||||||
|
cs.start();
|
||||||
|
|
||||||
|
ApplicationSubmissionContext asc =
|
||||||
|
Records.newRecord(ApplicationSubmissionContext.class);
|
||||||
|
asc.setQueue("default");
|
||||||
|
|
||||||
|
List<PlacementRule> rules =
|
||||||
|
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
||||||
|
UserGroupMappingPlacementRule r =
|
||||||
|
(UserGroupMappingPlacementRule) rules.get(0);
|
||||||
|
|
||||||
|
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
||||||
|
assertEquals("Queue", "b1", ctx.getQueue());
|
||||||
|
|
||||||
|
ApplicationPlacementContext ctx1 = r.getPlacementForApp(asc, "user2");
|
||||||
|
assertEquals("Queue", "user2group", ctx1.getQueue());
|
||||||
|
|
||||||
|
ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "b4");
|
||||||
|
assertEquals("Queue", "b4subgroup1", ctx2.getQueue());
|
||||||
|
} finally {
|
||||||
|
if (mockRM != null) {
|
||||||
|
mockRM.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue