YARN-10387. Implement logic which returns MappingRule objects based on mapping rules. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2020-09-09 15:12:58 +02:00
parent 1d6d0d8207
commit 773ac799c6
2 changed files with 673 additions and 0 deletions

View File

@ -0,0 +1,218 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRuleAction;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRuleActions;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRuleMatcher;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRuleMatchers;
// These are generated classes - use GeneratePojos class to create them
// if they are missing
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.MappingRulesDescription;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.FallbackResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Policy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
public class MappingRuleCreator {
private static final String ALL_USER = "*";
private static Logger LOG = LoggerFactory.getLogger(MappingRuleCreator.class);
public MappingRulesDescription getMappingRulesFromJson(String jsonPath)
throws IOException {
byte[] fileContents = Files.readAllBytes(Paths.get(jsonPath));
return getMappingRulesFromJson(fileContents);
}
MappingRulesDescription getMappingRulesFromJson(byte[] contents)
throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(contents, MappingRulesDescription.class);
}
public List<MappingRule> getMappingRules(String jsonPath) throws IOException {
MappingRulesDescription desc = getMappingRulesFromJson(jsonPath);
return getMappingRules(desc);
}
@VisibleForTesting
List<MappingRule> getMappingRules(MappingRulesDescription rules) {
List<MappingRule> mappingRules = new ArrayList<>();
for (Rule rule : rules.getRules()) {
checkMandatoryParameters(rule);
MappingRuleMatcher matcher = createMatcher(rule);
MappingRuleAction action = createAction(rule);
setFallbackToAction(rule, action);
MappingRule mappingRule = new MappingRule(matcher, action);
mappingRules.add(mappingRule);
}
return mappingRules;
}
private MappingRuleMatcher createMatcher(Rule rule) {
String matches = rule.getMatches();
Type type = rule.getType();
MappingRuleMatcher matcher = null;
switch (type) {
case USER:
if (ALL_USER.equals(matches)) {
matcher = MappingRuleMatchers.createAllMatcher();
} else {
matcher = MappingRuleMatchers.createUserMatcher(matches);
}
break;
case GROUP:
checkArgument(!ALL_USER.equals(matches), "Cannot match '*' for groups");
matcher = MappingRuleMatchers.createUserGroupMatcher(matches);
break;
case APPLICATION:
matcher = MappingRuleMatchers.createApplicationNameMatcher(matches);
break;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
return matcher;
}
private MappingRuleAction createAction(Rule rule) {
Policy policy = rule.getPolicy();
String queue = rule.getParentQueue();
boolean create;
if (rule.getCreate() == null) {
LOG.debug("Create flag is not set for rule {},"
+ "using \"true\" as default", rule);
create = true;
} else {
create = rule.getCreate();
}
MappingRuleAction action = null;
switch (policy) {
case DEFAULT_QUEUE:
action = MappingRuleActions.createPlaceToDefaultAction();
break;
case REJECT:
action = MappingRuleActions.createRejectAction();
break;
case PRIMARY_GROUP:
action = MappingRuleActions.createPlaceToQueueAction(
getTargetQueue(queue, "%primary_group"), create);
break;
case SECONDARY_GROUP:
action = MappingRuleActions.createPlaceToQueueAction(
getTargetQueue(queue, "%secondary_group"), create);
break;
case PRIMARY_GROUP_USER:
action = MappingRuleActions.createPlaceToQueueAction(
getTargetQueue(rule.getParentQueue(),
"%primary_group.%user"), create);
break;
case SECONDARY_GROUP_USER:
action = MappingRuleActions.createPlaceToQueueAction(
getTargetQueue(rule.getParentQueue(),
"%secondary_group.%user"), create);
break;
case SPECIFIED:
action = MappingRuleActions.createPlaceToQueueAction("%specified",
create);
break;
case CUSTOM:
String customTarget = rule.getCustomPlacement();
checkArgument(customTarget != null, "custom queue is undefined");
action = MappingRuleActions.createPlaceToQueueAction(customTarget,
create);
break;
case USER:
action = MappingRuleActions.createPlaceToQueueAction(
getTargetQueue(rule.getParentQueue(),
"%user"), create);
break;
case SET_DEFAULT_QUEUE:
String defaultQueue = rule.getValue();
checkArgument(defaultQueue != null, "default queue is undefined");
action = MappingRuleActions.createUpdateDefaultAction(defaultQueue);
break;
default:
throw new IllegalArgumentException(
"Unsupported policy: " + policy);
}
return action;
}
private void setFallbackToAction(Rule rule, MappingRuleAction action) {
FallbackResult fallbackResult = rule.getFallbackResult();
if (fallbackResult == null) {
action.setFallbackSkip();
LOG.debug("Fallback is not defined for rule {}, using SKIP as default", rule);
return;
}
switch (fallbackResult) {
case PLACE_DEFAULT:
action.setFallbackDefaultPlacement();
break;
case REJECT:
action.setFallbackReject();
break;
case SKIP:
action.setFallbackSkip();
break;
default:
throw new IllegalArgumentException(
"Unsupported fallback rule " + fallbackResult);
}
}
private String getTargetQueue(String parent, String placeholder) {
return (parent == null) ? placeholder : parent + "." + placeholder;
}
private void checkMandatoryParameters(Rule rule) {
checkArgument(rule.getPolicy() != null, "Rule policy is undefined");
checkArgument(rule.getType() != null, "Rule type is undefined");
checkArgument(rule.getMatches() != null, "Match string is undefined");
checkArgument(!StringUtils.isEmpty(rule.getMatches()),
"Match string is empty");
}
}

View File

@ -0,0 +1,455 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRuleResult;
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRuleResultType;
import org.apache.hadoop.yarn.server.resourcemanager.placement.VariableContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.MappingRulesDescription;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.FallbackResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Policy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Type;
import org.assertj.core.util.Sets;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestMappingRuleCreator {
private static final String MATCH_ALL = "*";
private static final String DEFAULT_QUEUE = "root.default";
private static final String SECONDARY_GROUP = "users";
private static final String PRIMARY_GROUP = "superuser";
private static final String APPLICATION_NAME = "MapReduce";
private static final String SPECIFIED_QUEUE = "root.users.hadoop";
private static final String USER_NAME = "testuser";
private MappingRuleCreator ruleCreator;
private VariableContext variableContext;
private MappingRulesDescription description;
private Rule rule;
@org.junit.Rule
public ExpectedException expected = ExpectedException.none();
@Before
public void setup() {
ruleCreator = new MappingRuleCreator();
prepareMappingRuleDescription();
variableContext = new VariableContext();
variableContext.put("%user", USER_NAME);
variableContext.put("%specified", SPECIFIED_QUEUE);
variableContext.put("%application", APPLICATION_NAME);
variableContext.put("%primary_group", PRIMARY_GROUP);
variableContext.put("%secondary_group", SECONDARY_GROUP);
variableContext.put("%default", DEFAULT_QUEUE);
variableContext.putExtraDataset("groups",
Sets.newLinkedHashSet(PRIMARY_GROUP, SECONDARY_GROUP));
}
@Test
public void testAllUserMatcher() {
variableContext.put("%user", USER_NAME);
verifyPlacementSucceeds(USER_NAME);
variableContext.put("%user", "dummyuser");
verifyPlacementSucceeds("dummyuser");
}
@Test
public void testSpecificUserMatcherPasses() {
rule.setMatches(USER_NAME);
verifyPlacementSucceeds(USER_NAME);
}
@Test
public void testSpecificUserMatcherFails() {
rule.setMatches(USER_NAME);
variableContext.put("%user", "dummyuser");
verifyNoPlacementOccurs();
}
@Test
public void testSpecificGroupMatcher() {
rule.setMatches(PRIMARY_GROUP);
rule.setType(Type.GROUP);
verifyPlacementSucceeds();
}
@Test
public void testAllGroupMatcherFailsDueToMatchString() {
rule.setType(Type.GROUP);
expected.expect(IllegalArgumentException.class);
expected.expectMessage("Cannot match '*' for groups");
// fails because "*" is not applicable to group type
ruleCreator.getMappingRules(description);
}
@Test
public void testApplicationNameMatcherPasses() {
rule.setType(Type.APPLICATION);
rule.setMatches(APPLICATION_NAME);
verifyPlacementSucceeds();
}
@Test
public void testApplicationNameMatcherFails() {
rule.setType(Type.APPLICATION);
rule.setMatches("dummyApplication");
verifyNoPlacementOccurs();
}
@Test
public void testDefaultRule() {
rule.setPolicy(Policy.DEFAULT_QUEUE);
verifyPlacementSucceeds(DEFAULT_QUEUE, false);
}
@Test
public void testSpecifiedRule() {
rule.setPolicy(Policy.SPECIFIED);
verifyPlacementSucceeds(SPECIFIED_QUEUE);
}
@Test
public void testSpecifiedRuleWithNoCreate() {
rule.setPolicy(Policy.SPECIFIED);
rule.setCreate(false);
verifyPlacementSucceeds(SPECIFIED_QUEUE, false);
}
@Test
public void testRejectRule() {
rule.setPolicy(Policy.REJECT);
verifyPlacementRejected();
}
@Test
public void testSetDefaultRule() {
rule.setPolicy(Policy.SET_DEFAULT_QUEUE);
rule.setValue("root.users.default");
verifyNoPlacementOccurs();
assertEquals("Default queue", "root.users.default",
variableContext.get("%default"));
}
@Test
public void testSetDefaultRuleWithMissingQueue() {
rule.setPolicy(Policy.SET_DEFAULT_QUEUE);
expected.expect(IllegalArgumentException.class);
expected.expectMessage("default queue is undefined");
ruleCreator.getMappingRules(description);
}
@Test
public void testPrimaryGroupRule() {
rule.setPolicy(Policy.PRIMARY_GROUP);
verifyPlacementSucceeds(PRIMARY_GROUP);
}
@Test
public void testPrimaryGroupRuleWithNoCreate() {
rule.setPolicy(Policy.PRIMARY_GROUP);
rule.setCreate(false);
verifyPlacementSucceeds(PRIMARY_GROUP, false);
}
@Test
public void testPrimaryGroupRuleWithParent() {
rule.setPolicy(Policy.PRIMARY_GROUP);
rule.setParentQueue("root");
verifyPlacementSucceeds("root." + PRIMARY_GROUP);
}
@Test
public void testSecondaryGroupRule() {
rule.setPolicy(Policy.SECONDARY_GROUP);
verifyPlacementSucceeds(SECONDARY_GROUP);
}
@Test
public void testSecondaryGroupRuleWithNoCreate() {
rule.setPolicy(Policy.SECONDARY_GROUP);
rule.setCreate(false);
verifyPlacementSucceeds(SECONDARY_GROUP, false);
}
@Test
public void testSecondaryGroupRuleWithParent() {
rule.setPolicy(Policy.SECONDARY_GROUP);
rule.setParentQueue("root");
verifyPlacementSucceeds("root." + SECONDARY_GROUP);
}
@Test
public void testUserRule() {
rule.setPolicy(Policy.USER);
verifyPlacementSucceeds(USER_NAME);
}
@Test
public void testUserRuleWithParent() {
rule.setPolicy(Policy.USER);
rule.setParentQueue("root.users");
verifyPlacementSucceeds("root.users." + USER_NAME);
}
@Test
public void testCustomRule() {
rule.setPolicy(Policy.CUSTOM);
rule.setCustomPlacement("root.%primary_group.%secondary_group");
verifyPlacementSucceeds(
String.format("root.%s.%s", PRIMARY_GROUP, SECONDARY_GROUP));
}
@Test
public void testCustomRuleWithNoCreate() {
rule.setPolicy(Policy.CUSTOM);
rule.setCustomPlacement("root.%primary_group.%secondary_group");
rule.setCreate(false);
verifyPlacementSucceeds(
String.format("root.%s.%s", PRIMARY_GROUP, SECONDARY_GROUP), false);
}
@Test
public void testCustomRuleWithMissingQueue() {
rule.setPolicy(Policy.CUSTOM);
expected.expect(IllegalArgumentException.class);
expected.expectMessage("custom queue is undefined");
ruleCreator.getMappingRules(description);
}
@Test
public void testPrimaryGroupUserRule() {
rule.setPolicy(Policy.PRIMARY_GROUP_USER);
verifyPlacementSucceeds("superuser.testuser");
}
@Test
public void testPrimaryGroupUserRuleWithNoCreate() {
rule.setPolicy(Policy.PRIMARY_GROUP_USER);
rule.setCreate(false);
verifyPlacementSucceeds("superuser.testuser", false);
}
@Test
public void testPrimaryGroupNestedRuleWithParent() {
rule.setPolicy(Policy.PRIMARY_GROUP_USER);
rule.setParentQueue("root");
verifyPlacementSucceeds("root.superuser.testuser");
}
@Test
public void testSecondaryGroupNestedRule() {
rule.setPolicy(Policy.SECONDARY_GROUP_USER);
verifyPlacementSucceeds("users.testuser");
}
@Test
public void testSecondaryGroupNestedRuleWithNoCreate() {
rule.setPolicy(Policy.SECONDARY_GROUP_USER);
rule.setCreate(false);
verifyPlacementSucceeds("users.testuser", false);
}
@Test
public void testSecondaryGroupNestedRuleWithParent() {
rule.setPolicy(Policy.SECONDARY_GROUP_USER);
rule.setParentQueue("root");
verifyPlacementSucceeds("root.users.testuser");
}
@Test
public void testDefaultQueueFallback() {
rule.setFallbackResult(FallbackResult.PLACE_DEFAULT);
testFallback(MappingRuleResultType.PLACE_TO_DEFAULT);
}
@Test
public void testRejectFallback() {
rule.setFallbackResult(FallbackResult.REJECT);
testFallback(MappingRuleResultType.REJECT);
}
@Test
public void testSkipFallback() {
rule.setFallbackResult(FallbackResult.SKIP);
testFallback(MappingRuleResultType.SKIP);
}
private void testFallback(MappingRuleResultType expectedType) {
List<MappingRule> rules = ruleCreator.getMappingRules(description);
MappingRule mpr = rules.get(0);
assertEquals("Fallback result",
expectedType, mpr.getFallback().getResult());
}
@Test
public void testFallbackResultUnset() {
rule.setFallbackResult(null);
List<MappingRule> rules = ruleCreator.getMappingRules(description);
MappingRule mpr = rules.get(0);
assertEquals("Fallback result", MappingRuleResultType.SKIP,
mpr.getFallback().getResult());
}
@Test
public void testTypeUnset() {
rule.setType(null);
expected.expect(IllegalArgumentException.class);
expected.expectMessage("Rule type is undefined");
ruleCreator.getMappingRules(description);
}
@Test
public void testMatchesUnset() {
rule.setMatches(null);
expected.expect(IllegalArgumentException.class);
expected.expectMessage("Match string is undefined");
ruleCreator.getMappingRules(description);
}
@Test
public void testMatchesEmpty() {
rule.setMatches("");
expected.expect(IllegalArgumentException.class);
expected.expectMessage("Match string is empty");
ruleCreator.getMappingRules(description);
}
@Test
public void testPolicyUnset() {
rule.setPolicy(null);
expected.expect(IllegalArgumentException.class);
expected.expectMessage("Rule policy is undefined");
ruleCreator.getMappingRules(description);
}
private void prepareMappingRuleDescription() {
description = new MappingRulesDescription();
rule = new Rule();
rule.setType(Type.USER);
rule.setFallbackResult(FallbackResult.SKIP);
rule.setPolicy(Policy.USER);
rule.setMatches(MATCH_ALL);
List<Rule> rules = new ArrayList<>();
rules.add(rule);
description.setRules(rules);
}
private void verifyPlacementSucceeds() {
verifyPlacement(MappingRuleResultType.PLACE, null, true);
}
private void verifyPlacementSucceeds(String expectedQueue) {
verifyPlacement(MappingRuleResultType.PLACE, expectedQueue, true);
}
private void verifyPlacementSucceeds(String expectedQueue,
boolean allowCreate) {
verifyPlacement(MappingRuleResultType.PLACE, expectedQueue,
allowCreate);
}
private void verifyPlacementRejected() {
verifyPlacement(MappingRuleResultType.REJECT, null, true);
}
private void verifyNoPlacementOccurs() {
verifyPlacement(null, null, true);
}
private void verifyPlacement(MappingRuleResultType expectedResultType,
String expectedQueue, boolean allowCreate) {
List<MappingRule> rules = ruleCreator.getMappingRules(description);
assertEquals("Number of rules", 1, rules.size());
MappingRule mpr = rules.get(0);
MappingRuleResult result = mpr.evaluate(variableContext);
assertEquals("Create flag", allowCreate, result.isCreateAllowed());
if (expectedResultType != null) {
assertEquals("Mapping rule result",
expectedResultType, result.getResult());
} else {
assertEquals("Mapping rule result",
MappingRuleResultType.SKIP, result.getResult());
}
if (expectedQueue != null) {
assertEquals("Evaluated queue", expectedQueue, result.getQueue());
}
}
}