YARN-10635. CSMapping rule can return paths with empty parts. Contributed by Gergely Pollak.
This commit is contained in:
parent
25af8901a9
commit
4383726d19
@ -194,6 +194,7 @@ private void setupGroupsForVariableContext(VariableContext vctx, String user)
|
|||||||
String secondaryGroup = null;
|
String secondaryGroup = null;
|
||||||
Iterator<String> it = groupsSet.iterator();
|
Iterator<String> it = groupsSet.iterator();
|
||||||
String primaryGroup = it.next();
|
String primaryGroup = it.next();
|
||||||
|
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
String group = it.next();
|
String group = it.next();
|
||||||
if (this.queueManager.getQueue(group) != null) {
|
if (this.queueManager.getQueue(group) != null) {
|
||||||
@ -203,8 +204,7 @@ private void setupGroupsForVariableContext(VariableContext vctx, String user)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (secondaryGroup == null && LOG.isDebugEnabled()) {
|
if (secondaryGroup == null && LOG.isDebugEnabled()) {
|
||||||
LOG.debug("User {} is not associated with any Secondary " +
|
LOG.debug("User {} is not associated with any Secondary group", user);
|
||||||
"Group. Hence it may use the 'default' queue", user);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vctx.put("%primary_group", primaryGroup);
|
vctx.put("%primary_group", primaryGroup);
|
||||||
@ -223,7 +223,15 @@ private VariableContext createVariableContext(
|
|||||||
//To place queues specifically to default, users must use root.default
|
//To place queues specifically to default, users must use root.default
|
||||||
if (!asc.getQueue().equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
|
if (!asc.getQueue().equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
|
||||||
vctx.put("%specified", asc.getQueue());
|
vctx.put("%specified", asc.getQueue());
|
||||||
|
} else {
|
||||||
|
//Adding specified as empty will prevent it to be undefined and it won't
|
||||||
|
//try to place the application to a queue named '%specified', queue path
|
||||||
|
//validation will reject the empty path or the path with empty parts,
|
||||||
|
//so we sill still hit the fallback action of this rule if no queue
|
||||||
|
//is specified
|
||||||
|
vctx.put("%specified", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
vctx.put("%application", asc.getApplicationName());
|
vctx.put("%application", asc.getApplicationName());
|
||||||
vctx.put("%default", "root.default");
|
vctx.put("%default", "root.default");
|
||||||
try {
|
try {
|
||||||
@ -239,6 +247,12 @@ private VariableContext createVariableContext(
|
|||||||
private String validateAndNormalizeQueue(
|
private String validateAndNormalizeQueue(
|
||||||
String queueName, boolean allowCreate) throws YarnException {
|
String queueName, boolean allowCreate) throws YarnException {
|
||||||
MappingQueuePath path = new MappingQueuePath(queueName);
|
MappingQueuePath path = new MappingQueuePath(queueName);
|
||||||
|
|
||||||
|
if (path.hasEmptyPart()) {
|
||||||
|
throw new YarnException("Invalid path returned by rule: '" +
|
||||||
|
queueName + "'");
|
||||||
|
}
|
||||||
|
|
||||||
String leaf = path.getLeafName();
|
String leaf = path.getLeafName();
|
||||||
String parent = path.getParent();
|
String parent = path.getParent();
|
||||||
|
|
||||||
@ -335,14 +349,19 @@ private MappingRuleResult evaluateRule(
|
|||||||
MappingRule rule, VariableContext variables) {
|
MappingRule rule, VariableContext variables) {
|
||||||
MappingRuleResult result = rule.evaluate(variables);
|
MappingRuleResult result = rule.evaluate(variables);
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Evaluated rule '{}' with result: '{}'", rule, result);
|
||||||
|
}
|
||||||
|
|
||||||
if (result.getResult() == MappingRuleResultType.PLACE) {
|
if (result.getResult() == MappingRuleResultType.PLACE) {
|
||||||
try {
|
try {
|
||||||
result.updateNormalizedQueue(validateAndNormalizeQueue(
|
result.updateNormalizedQueue(validateAndNormalizeQueue(
|
||||||
result.getQueue(), result.isCreateAllowed()));
|
result.getQueue(), result.isCreateAllowed()));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("Cannot place to queue '{}' returned by mapping rule. " +
|
|
||||||
"Reason: {}", result.getQueue(), e.getMessage());
|
|
||||||
result = rule.getFallback();
|
result = rule.getFallback();
|
||||||
|
LOG.info("Cannot place to queue '{}' returned by mapping rule. " +
|
||||||
|
"Reason: '{}' Fallback operation: '{}'",
|
||||||
|
result.getQueue(), e.getMessage(), result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -451,6 +470,12 @@ public ApplicationPlacementContext getPlacementForApp(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Placement final result '{}' for application '{}'",
|
||||||
|
(ret == null ? "null" : ret.getFullQueuePath()),
|
||||||
|
asc.getApplicationId());
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,6 +69,21 @@ private void setFromFullPath(String fullPath) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple helper method to determine if the path contains any empty parts.
|
||||||
|
* @return true if there is at least one empty part of the path
|
||||||
|
*/
|
||||||
|
public boolean hasEmptyPart() {
|
||||||
|
String[] parts = getFullPath().split("\\.");
|
||||||
|
for (int i = 0; i < parts.length; i++) {
|
||||||
|
if (parts[i].equals("")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Getter for the parent part of the path.
|
* Getter for the parent part of the path.
|
||||||
* @return Parent path of the queue, null if there is no parent.
|
* @return Parent path of the queue, null if there is no parent.
|
||||||
|
@ -66,13 +66,15 @@ public static class PlaceToQueueAction extends MappingRuleActionBase {
|
|||||||
* This method is the main logic of the action, it will replace all the
|
* This method is the main logic of the action, it will replace all the
|
||||||
* variables in the queuePattern with their respective values, then returns
|
* variables in the queuePattern with their respective values, then returns
|
||||||
* a placementResult with the final queue name.
|
* a placementResult with the final queue name.
|
||||||
|
*
|
||||||
* @param variables The variable context, which contains all the variables
|
* @param variables The variable context, which contains all the variables
|
||||||
* @return The result of the action
|
* @return The result of the action
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public MappingRuleResult execute(VariableContext variables) {
|
public MappingRuleResult execute(VariableContext variables) {
|
||||||
String substituted = variables.replacePathVariables(queuePattern);
|
String substituted = variables.replacePathVariables(queuePattern);
|
||||||
return MappingRuleResult.createPlacementResult(substituted, allowCreate);
|
return MappingRuleResult.createPlacementResult(
|
||||||
|
substituted, allowCreate);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -64,7 +64,7 @@ public class TestCSMappingPlacementRule {
|
|||||||
"alice", ImmutableSet.of("p_alice", "user", "developer"),
|
"alice", ImmutableSet.of("p_alice", "user", "developer"),
|
||||||
"bob", ImmutableSet.of("p_bob", "user", "developer"),
|
"bob", ImmutableSet.of("p_bob", "user", "developer"),
|
||||||
"charlie", ImmutableSet.of("p_charlie", "user", "tester"),
|
"charlie", ImmutableSet.of("p_charlie", "user", "tester"),
|
||||||
"dave", ImmutableSet.of("user", "tester"),
|
"dave", ImmutableSet.of("user"),
|
||||||
"emily", ImmutableSet.of("user", "tester", "developer")
|
"emily", ImmutableSet.of("user", "tester", "developer")
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -90,6 +90,7 @@ private void createQueueHierarchy(CapacitySchedulerQueueManager queueManager) {
|
|||||||
.withQueue("root.disambiguous.deep.disambiuser.disambi")
|
.withQueue("root.disambiguous.deep.disambiuser.disambi")
|
||||||
.withQueue("root.disambiguous.deep.group.developer")
|
.withQueue("root.disambiguous.deep.group.developer")
|
||||||
.withManagedParentQueue("root.disambiguous.deep.dman")
|
.withManagedParentQueue("root.disambiguous.deep.dman")
|
||||||
|
.withDynamicParentQueue("root.dynamic")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
when(queueManager.getQueue(isNull())).thenReturn(null);
|
when(queueManager.getQueue(isNull())).thenReturn(null);
|
||||||
@ -151,8 +152,9 @@ private ApplicationSubmissionContext createApp(String name) {
|
|||||||
private void assertReject(String message, CSMappingPlacementRule engine,
|
private void assertReject(String message, CSMappingPlacementRule engine,
|
||||||
ApplicationSubmissionContext asc, String user) {
|
ApplicationSubmissionContext asc, String user) {
|
||||||
try {
|
try {
|
||||||
engine.getPlacementForApp(asc, user);
|
ApplicationPlacementContext apc = engine.getPlacementForApp(asc, user);
|
||||||
fail(message);
|
fail("Unexpected queue result: " + apc.getFullQueuePath() + " - " +
|
||||||
|
message);
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
//To prevent PlacementRule chaining present in PlacementManager
|
//To prevent PlacementRule chaining present in PlacementManager
|
||||||
//when an application is rejected an exception is thrown to make sure
|
//when an application is rejected an exception is thrown to make sure
|
||||||
@ -483,14 +485,36 @@ private MappingRule createGroupMapping(String group, String queue) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGroupMatching() throws IOException {
|
public void testGroupTargetMatching() throws IOException {
|
||||||
ArrayList<MappingRule> rules = new ArrayList<>();
|
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||||
|
|
||||||
rules.add(createGroupMapping("p_alice", "root.man.p_alice"));
|
rules.add(
|
||||||
rules.add(createGroupMapping("developer", "root.man.developer"));
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("alice"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction(
|
||||||
|
"root.man.%primary_group", true))
|
||||||
|
.setFallbackReject()));
|
||||||
|
|
||||||
//everybody is in the user group, this should catch all
|
rules.add(
|
||||||
rules.add(createGroupMapping("user", "root.man.user"));
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("bob"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction(
|
||||||
|
"root.dynamic.%secondary_group.%user", true))
|
||||||
|
.setFallbackReject()));
|
||||||
|
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("charlie"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction(
|
||||||
|
"root.man.%secondary_group", true))
|
||||||
|
.setFallbackReject()));
|
||||||
|
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("dave"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction(
|
||||||
|
"root.dynamic.%secondary_group.%user", true))
|
||||||
|
.setFallbackReject()));
|
||||||
|
|
||||||
CSMappingPlacementRule engine = setupEngine(true, rules);
|
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||||
ApplicationSubmissionContext app = createApp("app");
|
ApplicationSubmissionContext app = createApp("app");
|
||||||
@ -499,12 +523,15 @@ public void testGroupMatching() throws IOException {
|
|||||||
"Alice should be placed to root.man.p_alice based on her primary group",
|
"Alice should be placed to root.man.p_alice based on her primary group",
|
||||||
engine, app, "alice", "root.man.p_alice");
|
engine, app, "alice", "root.man.p_alice");
|
||||||
assertPlace(
|
assertPlace(
|
||||||
"Bob should be placed to root.man.developer based on his developer " +
|
"Bob should be placed to root.dynamic.developer.bob based on his " +
|
||||||
"group", engine, app, "bob", "root.man.developer");
|
"secondary group, since we have a queue named 'developer', bob " +
|
||||||
assertPlace(
|
"identifies as a user with secondary_group 'developer'", engine, app,
|
||||||
"Charlie should be placed to root.man.user because he is not a " +
|
"bob", "root.dynamic.developer.bob");
|
||||||
"developer nor in the p_alice group", engine, app, "charlie",
|
assertReject("Charlie should get rejected because he neither of his" +
|
||||||
"root.man.user");
|
"groups have an ambiguous queue, so effectively he has no secondary " +
|
||||||
|
"group", engine, app, "charlie");
|
||||||
|
assertReject("Dave should get rejected because he has no secondary group",
|
||||||
|
engine, app, "dave");
|
||||||
}
|
}
|
||||||
|
|
||||||
void assertConfigTestResult(List<MappingRule> rules) {
|
void assertConfigTestResult(List<MappingRule> rules) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user