YARN-9938. Validate Parent Queue for QueueMapping contains dynamic group as parent queue. Contributed by Manikandan R
This commit is contained in:
parent
0ab9c0398d
commit
cf68857631
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||||
|
|
||||||
public class UserGroupMappingPlacementRule extends PlacementRule {
|
public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
|
@ -178,18 +179,20 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
if (mapping.getParentQueue() != null
|
if (mapping.getParentQueue() != null
|
||||||
&& mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
|
&& mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
|
||||||
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
||||||
return getPlacementContext(
|
QueueMapping queueMapping =
|
||||||
new QueueMapping(mapping.getType(), mapping.getSource(),
|
new QueueMapping(mapping.getType(), mapping.getSource(),
|
||||||
CURRENT_USER_MAPPING, groups.getGroups(user).get(0)),
|
user, groups.getGroups(user).get(0));
|
||||||
user);
|
validateQueueMapping(queueMapping);
|
||||||
|
return getPlacementContext(queueMapping, user);
|
||||||
} else if (mapping.getParentQueue() != null
|
} else if (mapping.getParentQueue() != null
|
||||||
&& mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING)
|
&& mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING)
|
||||||
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
||||||
String secondaryGroup = getSecondaryGroup(user);
|
String secondaryGroup = getSecondaryGroup(user);
|
||||||
if (secondaryGroup != null) {
|
if (secondaryGroup != null) {
|
||||||
return getPlacementContext(new QueueMapping(mapping.getType(),
|
QueueMapping queueMapping = new QueueMapping(mapping.getType(),
|
||||||
mapping.getSource(), CURRENT_USER_MAPPING, secondaryGroup),
|
mapping.getSource(), user, secondaryGroup);
|
||||||
user);
|
validateQueueMapping(queueMapping);
|
||||||
|
return getPlacementContext(queueMapping, user);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("User {} is not associated with any Secondary Group. "
|
LOG.debug("User {} is not associated with any Secondary Group. "
|
||||||
|
@ -429,6 +432,28 @@ public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||||
.contains(UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING);
|
.contains(UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void validateQueueMapping(QueueMapping queueMapping)
|
||||||
|
throws IOException {
|
||||||
|
String parentQueueName = queueMapping.getParentQueue();
|
||||||
|
String leafQueueName = queueMapping.getQueue();
|
||||||
|
CSQueue parentQueue = queueManager.getQueue(parentQueueName);
|
||||||
|
CSQueue leafQueue = queueManager.getQueue(leafQueueName);
|
||||||
|
|
||||||
|
if (leafQueue == null || (!(leafQueue instanceof LeafQueue))) {
|
||||||
|
throw new IOException("mapping contains invalid or non-leaf queue : "
|
||||||
|
+ leafQueueName);
|
||||||
|
} else if (parentQueue == null || (!(parentQueue instanceof ParentQueue))) {
|
||||||
|
throw new IOException(
|
||||||
|
"mapping contains invalid parent queue [" + parentQueueName + "]");
|
||||||
|
} else if (!parentQueue.getQueueName()
|
||||||
|
.equals(leafQueue.getParent().getQueueName())) {
|
||||||
|
throw new IOException("mapping contains invalid parent queue "
|
||||||
|
+ "which does not match existing leaf queue's parent : ["
|
||||||
|
+ parentQueue.getQueueName() + "] does not match [ "
|
||||||
|
+ leafQueue.getParent().getQueueName() + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public List<QueueMapping> getQueueMappings() {
|
public List<QueueMapping> getQueueMappings() {
|
||||||
return mappings;
|
return mappings;
|
||||||
|
|
|
@ -31,8 +31,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.PrimaryGroupMapping;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.PrimaryGroupMapping;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
@ -69,7 +70,26 @@ public class TestUserGroupMappingPlacementRule {
|
||||||
overwrite, Arrays.asList(queueMapping), groups);
|
overwrite, Arrays.asList(queueMapping), groups);
|
||||||
CapacitySchedulerQueueManager queueManager =
|
CapacitySchedulerQueueManager queueManager =
|
||||||
mock(CapacitySchedulerQueueManager.class);
|
mock(CapacitySchedulerQueueManager.class);
|
||||||
when(queueManager.getQueue("asubgroup2")).thenReturn(mock(CSQueue.class));
|
|
||||||
|
ParentQueue agroup = mock(ParentQueue.class);
|
||||||
|
when(agroup.getQueueName()).thenReturn("agroup");
|
||||||
|
ParentQueue bsubgroup2 = mock(ParentQueue.class);
|
||||||
|
when(bsubgroup2.getQueueName()).thenReturn("bsubgroup2");
|
||||||
|
|
||||||
|
LeafQueue a = mock(LeafQueue.class);
|
||||||
|
when(a.getQueueName()).thenReturn("a");
|
||||||
|
when(a.getParent()).thenReturn(agroup);
|
||||||
|
LeafQueue b = mock(LeafQueue.class);
|
||||||
|
when(b.getQueueName()).thenReturn("b");
|
||||||
|
when(b.getParent()).thenReturn(bsubgroup2);
|
||||||
|
LeafQueue asubgroup2 = mock(LeafQueue.class);
|
||||||
|
when(asubgroup2.getQueueName()).thenReturn("asubgroup2");
|
||||||
|
|
||||||
|
when(queueManager.getQueue("a")).thenReturn(a);
|
||||||
|
when(queueManager.getQueue("b")).thenReturn(b);
|
||||||
|
when(queueManager.getQueue("agroup")).thenReturn(agroup);
|
||||||
|
when(queueManager.getQueue("bsubgroup2")).thenReturn(bsubgroup2);
|
||||||
|
when(queueManager.getQueue("asubgroup2")).thenReturn(asubgroup2);
|
||||||
rule.setQueueManager(queueManager);
|
rule.setQueueManager(queueManager);
|
||||||
ApplicationSubmissionContext asc = Records.newRecord(
|
ApplicationSubmissionContext asc = Records.newRecord(
|
||||||
ApplicationSubmissionContext.class);
|
ApplicationSubmissionContext.class);
|
||||||
|
@ -117,7 +137,7 @@ public class TestUserGroupMappingPlacementRule {
|
||||||
verifyQueueMapping(
|
verifyQueueMapping(
|
||||||
new QueueMapping(MappingType.USER, "%user", "%user",
|
new QueueMapping(MappingType.USER, "%user", "%user",
|
||||||
"%secondary_group"),
|
"%secondary_group"),
|
||||||
"a", YarnConfiguration.DEFAULT_QUEUE_NAME, "a", false, "asubgroup2");
|
"b", YarnConfiguration.DEFAULT_QUEUE_NAME, "b", false, "bsubgroup2");
|
||||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
|
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"),
|
||||||
"a", "q1");
|
"a", "q1");
|
||||||
|
|
||||||
|
|
|
@ -112,15 +112,24 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
public static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
public static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||||
public static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
|
public static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
|
||||||
public static final String E = CapacitySchedulerConfiguration.ROOT + ".e";
|
public static final String E = CapacitySchedulerConfiguration.ROOT + ".e";
|
||||||
|
public static final String ASUBGROUP1 =
|
||||||
|
CapacitySchedulerConfiguration.ROOT + ".esubgroup1";
|
||||||
|
public static final String AGROUP =
|
||||||
|
CapacitySchedulerConfiguration.ROOT + ".fgroup";
|
||||||
public static final String A1 = A + ".a1";
|
public static final String A1 = A + ".a1";
|
||||||
public static final String A2 = A + ".a2";
|
public static final String A2 = A + ".a2";
|
||||||
public static final String B1 = B + ".b1";
|
public static final String B1 = B + ".b1";
|
||||||
public static final String B2 = B + ".b2";
|
public static final String B2 = B + ".b2";
|
||||||
public static final String B3 = B + ".b3";
|
public static final String B3 = B + ".b3";
|
||||||
|
public static final String ASUBGROUP1_A = ASUBGROUP1 + ".e";
|
||||||
|
public static final String AGROUP_A = AGROUP + ".f";
|
||||||
public static final float A_CAPACITY = 20f;
|
public static final float A_CAPACITY = 20f;
|
||||||
public static final float B_CAPACITY = 40f;
|
public static final float B_CAPACITY = 20f;
|
||||||
public static final float C_CAPACITY = 20f;
|
public static final float C_CAPACITY = 20f;
|
||||||
public static final float D_CAPACITY = 20f;
|
public static final float D_CAPACITY = 20f;
|
||||||
|
public static final float ASUBGROUP1_CAPACITY = 10f;
|
||||||
|
public static final float AGROUP_CAPACITY = 10f;
|
||||||
|
|
||||||
public static final float A1_CAPACITY = 30;
|
public static final float A1_CAPACITY = 30;
|
||||||
public static final float A2_CAPACITY = 70;
|
public static final float A2_CAPACITY = 70;
|
||||||
public static final float B1_CAPACITY = 60f;
|
public static final float B1_CAPACITY = 60f;
|
||||||
|
@ -328,12 +337,15 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
// Set childQueue for root
|
// Set childQueue for root
|
||||||
conf.setQueues(ROOT,
|
conf.setQueues(ROOT,
|
||||||
new String[] { "a", "b", "c", "d", "asubgroup1", "asubgroup2" });
|
new String[] {"a", "b", "c", "d", "esubgroup1", "asubgroup2",
|
||||||
|
"fgroup"});
|
||||||
|
|
||||||
conf.setCapacity(A, A_CAPACITY);
|
conf.setCapacity(A, A_CAPACITY);
|
||||||
conf.setCapacity(B, B_CAPACITY);
|
conf.setCapacity(B, B_CAPACITY);
|
||||||
conf.setCapacity(C, C_CAPACITY);
|
conf.setCapacity(C, C_CAPACITY);
|
||||||
conf.setCapacity(D, D_CAPACITY);
|
conf.setCapacity(D, D_CAPACITY);
|
||||||
|
conf.setCapacity(ASUBGROUP1, ASUBGROUP1_CAPACITY);
|
||||||
|
conf.setCapacity(AGROUP, AGROUP_CAPACITY);
|
||||||
|
|
||||||
// Define 2nd-level queues
|
// Define 2nd-level queues
|
||||||
conf.setQueues(A, new String[] { "a1", "a2" });
|
conf.setQueues(A, new String[] { "a1", "a2" });
|
||||||
|
@ -350,6 +362,13 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
conf.setCapacity(B3, B3_CAPACITY);
|
conf.setCapacity(B3, B3_CAPACITY);
|
||||||
conf.setUserLimitFactor(B3, 100.0f);
|
conf.setUserLimitFactor(B3, 100.0f);
|
||||||
|
|
||||||
|
conf.setQueues(ASUBGROUP1, new String[] {"e"});
|
||||||
|
conf.setCapacity(ASUBGROUP1_A, 100f);
|
||||||
|
conf.setUserLimitFactor(ASUBGROUP1_A, 100.0f);
|
||||||
|
conf.setQueues(AGROUP, new String[] {"f"});
|
||||||
|
conf.setCapacity(AGROUP_A, 100f);
|
||||||
|
conf.setUserLimitFactor(AGROUP_A, 100.0f);
|
||||||
|
|
||||||
conf.setUserLimitFactor(C, 1.0f);
|
conf.setUserLimitFactor(C, 1.0f);
|
||||||
conf.setAutoCreateChildQueueEnabled(C, true);
|
conf.setAutoCreateChildQueueEnabled(C, true);
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
||||||
|
@ -226,7 +227,23 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
||||||
queueMappingsForUG.add(userQueueMapping1);
|
queueMappingsForUG.add(userQueueMapping1);
|
||||||
queueMappingsForUG.add(userQueueMapping2);
|
queueMappingsForUG.add(userQueueMapping2);
|
||||||
|
|
||||||
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true);
|
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "f");
|
||||||
|
|
||||||
|
try {
|
||||||
|
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "h");
|
||||||
|
fail("Leaf Queue 'h' doesn't exists");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// Exception is expected as there is no such leaf queue
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, true, "a1");
|
||||||
|
fail("Actual Parent Queue of Leaf Queue 'a1' is 'a', but as per queue "
|
||||||
|
+ "mapping it returns primary queue as 'a1group'");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// Exception is expected as there is mismatch in expected and actual
|
||||||
|
// parent queue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -259,12 +276,12 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
||||||
queueMappingsForUG.add(userQueueMapping2);
|
queueMappingsForUG.add(userQueueMapping2);
|
||||||
queueMappingsForUG.add(userQueueMapping1);
|
queueMappingsForUG.add(userQueueMapping1);
|
||||||
|
|
||||||
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, false);
|
testNestedUserQueueWithDynamicParentQueue(queueMappingsForUG, false, "e");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testNestedUserQueueWithDynamicParentQueue(
|
private void testNestedUserQueueWithDynamicParentQueue(
|
||||||
List<UserGroupMappingPlacementRule.QueueMapping> mapping,
|
List<UserGroupMappingPlacementRule.QueueMapping> mapping, boolean primary,
|
||||||
boolean primary)
|
String user)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
setupQueueConfiguration(conf);
|
setupQueueConfiguration(conf);
|
||||||
|
@ -301,13 +318,13 @@ public class TestCapacitySchedulerQueueMappingFactory {
|
||||||
|
|
||||||
UserGroupMappingPlacementRule r =
|
UserGroupMappingPlacementRule r =
|
||||||
(UserGroupMappingPlacementRule) rules.get(0);
|
(UserGroupMappingPlacementRule) rules.get(0);
|
||||||
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "a");
|
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, user);
|
||||||
assertEquals("Queue", "a", ctx.getQueue());
|
assertEquals("Queue", user, ctx.getQueue());
|
||||||
|
|
||||||
if (primary) {
|
if (primary) {
|
||||||
assertEquals("Primary Group", "agroup", ctx.getParentQueue());
|
assertEquals("Primary Group", user + "group", ctx.getParentQueue());
|
||||||
} else {
|
} else {
|
||||||
assertEquals("Secondary Group", "asubgroup1", ctx.getParentQueue());
|
assertEquals("Secondary Group", user + "subgroup1", ctx.getParentQueue());
|
||||||
}
|
}
|
||||||
mockRM.close();
|
mockRM.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue