diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 0ad480248ea..d0ee25df300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -23,6 +23,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -391,6 +393,19 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur "allow-zero-capacity-sum"; public static final boolean DEFAULT_ALLOW_ZERO_CAPACITY_SUM = false; + public static final String MAPPING_RULE_FORMAT = + PREFIX + "mapping-rule-format"; + public static final String MAPPING_RULE_JSON = + PREFIX + "mapping-rule-json"; + public static final String MAPPING_RULE_JSON_FILE = + PREFIX + "mapping-rule-json-file"; + + public static final String MAPPING_RULE_FORMAT_LEGACY = "legacy"; + public static final String MAPPING_RULE_FORMAT_JSON = "json"; + + public static final String MAPPING_RULE_FORMAT_DEFAULT = + MAPPING_RULE_FORMAT_LEGACY; + /** * Different resource types supported. */ @@ -1168,7 +1183,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return mappings; } - public List getMappingRules() { + public List parseLegacyMappingRules() { List mappings = new ArrayList(); Collection mappingsString = getTrimmedStringCollection(QUEUE_MAPPING); @@ -1208,6 +1223,53 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur return mappings; } + public List parseJSONMappingRules() throws IOException { + String mappingJson = get(MAPPING_RULE_JSON, ""); + String mappingJsonFile = get(MAPPING_RULE_JSON_FILE, ""); + MappingRuleCreator creator = new MappingRuleCreator(); + + if (!mappingJson.equals("")) { + LOG.info("Reading mapping rules from provided inline JSON '{}'.", + mappingJson); + try { + return creator.getMappingRulesFromString(mappingJson); + } catch (IOException e) { + LOG.error("Error parsing mapping rule inline JSON."); + throw e; + } + } else if (!mappingJsonFile.equals("")) { + LOG.info("Reading mapping rules from JSON file '{}'.", + mappingJsonFile); + try { + return creator.getMappingRulesFromFile(mappingJsonFile.trim()); + } catch (IOException e) { + LOG.error("Error reading or parsing mapping rule JSON file '{}'.", + mappingJsonFile); + throw e; + } + } else { + LOG.warn("Mapping rule is set to JSON, but no inline JSON nor a JSON " + + "file was provided! Starting with no mapping rules!"); + } + + return new ArrayList<>(); + } + + public List getMappingRules() throws IOException { + String mappingFormat = + get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT); + if (mappingFormat.equals(MAPPING_RULE_FORMAT_LEGACY)) { + return parseLegacyMappingRules(); + } else if (mappingFormat.equals(MAPPING_RULE_FORMAT_JSON)) { + return parseJSONMappingRules(); + } else { + throw new IllegalArgumentException( + "Illegal queue mapping format '" + mappingFormat + "' please use '" + + MAPPING_RULE_FORMAT_LEGACY + "' or '" + MAPPING_RULE_FORMAT_JSON + + "'"); + } + } + @Private @VisibleForTesting public void setQueuePlacementRules(Collection queuePlacementRules) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/MappingRuleCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/MappingRuleCreator.java index 57fc7fe18e8..11eb5bff679 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/MappingRuleCreator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/MappingRuleCreator.java @@ -50,9 +50,9 @@ public class MappingRuleCreator { private static final String ALL_USER = "*"; private static Logger LOG = LoggerFactory.getLogger(MappingRuleCreator.class); - public MappingRulesDescription getMappingRulesFromJson(String jsonPath) + public MappingRulesDescription getMappingRulesFromJsonFile(String filePath) throws IOException { - byte[] fileContents = Files.readAllBytes(Paths.get(jsonPath)); + byte[] fileContents = Files.readAllBytes(Paths.get(filePath)); return getMappingRulesFromJson(fileContents); } @@ -62,8 +62,21 @@ public class MappingRuleCreator { return objectMapper.readValue(contents, MappingRulesDescription.class); } - public List getMappingRules(String jsonPath) throws IOException { - MappingRulesDescription desc = getMappingRulesFromJson(jsonPath); + MappingRulesDescription getMappingRulesFromJson(String contents) + throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readValue(contents, MappingRulesDescription.class); + } + + public List getMappingRulesFromFile(String jsonPath) + throws IOException { + MappingRulesDescription desc = getMappingRulesFromJsonFile(jsonPath); + return getMappingRules(desc); + } + + public List getMappingRulesFromString(String json) + throws IOException { + MappingRulesDescription desc = getMappingRulesFromJson(json); return getMappingRules(desc); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestCSMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestCSMappingPlacementRule.java index 8f435dd19b1..6ee7b5df617 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestCSMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestCSMappingPlacementRule.java @@ -28,17 +28,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; import org.apache.hadoop.yarn.util.Records; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import static junit.framework.TestCase.*; +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertNull; +import static junit.framework.TestCase.assertTrue; +import static junit.framework.TestCase.fail; import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; @@ -47,6 +56,10 @@ import static org.mockito.Mockito.when; public class TestCSMappingPlacementRule { private static final Logger LOG = LoggerFactory .getLogger(TestCSMappingPlacementRule.class); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private Map> userGroups = ImmutableMap.of( "alice", ImmutableSet.of("p_alice", "user", "developer"), "bob", ImmutableSet.of("p_bob", "user", "developer"), @@ -444,4 +457,105 @@ public class TestCSMappingPlacementRule { "developer nor in the p_alice group", engine, app, "charlie", "root.man.user"); } + + void assertConfigTestResult(List rules) { + assertEquals("We only specified one rule", 1, rules.size()); + MappingRule rule = rules.get(0); + String ruleStr = rule.toString(); + assertTrue("Rule's matcher variable should be %user", + ruleStr.contains("variable='%user'")); + assertTrue("Rule's match value should be bob", + ruleStr.contains("value='bob'")); + assertTrue("Rule's action should be place to queue", ruleStr.contains( + "action=PlaceToQueueAction{queueName='%primary_group'}")); + } + + @Test + public void testLegacyConfiguration() throws IOException { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, + CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_LEGACY); + conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "u:bob:%primary_group"); + + List rules = conf.getMappingRules(); + assertConfigTestResult(rules); + } + + @Test + public void testJSONConfiguration() throws IOException { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, + CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON, + "{\"rules\": [{" + + " \"type\": \"user\"," + + " \"matches\": \"bob\"," + + " \"policy\": \"custom\"," + + " \"customPlacement\": \"%primary_group\"," + + " \"fallbackResult\":\"skip\"" + + "}]}"); + + List rules = conf.getMappingRules(); + assertConfigTestResult(rules); + } + + @Test + public void testEmptyJSONConfiguration() throws IOException { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, + CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON, ""); + + List rules = conf.getMappingRules(); + assertEquals("We expect no rules", 0, rules.size()); + } + + @Test(expected = IOException.class) + public void testInvalidJSONConfiguration() throws IOException { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, + CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON, + "I'm a bad JSON, since I'm not a JSON."); + List rules = conf.getMappingRules(); + } + + @Test(expected = IOException.class) + public void testMissingJSONFileConfiguration() throws IOException { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, + CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON_FILE, + "/dev/null/nofile"); + List rules = conf.getMappingRules(); + } + + @Test + public void testJSONFileConfiguration() throws IOException { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT, + CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON); + + File jsonFile = folder.newFile("testJSONFileConfiguration.json"); + + BufferedWriter writer = new BufferedWriter(new FileWriter(jsonFile)); + try { + writer.write("{\"rules\": [{" + + " \"type\": \"user\"," + + " \"matches\": \"bob\"," + + " \"policy\": \"custom\"," + + " \"customPlacement\": \"%primary_group\"," + + " \"fallbackResult\":\"skip\"" + + "}]}"); + } finally { + writer.close(); + } + + conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON_FILE, + jsonFile.getAbsolutePath()); + List rules = conf.getMappingRules(); + + assertConfigTestResult(rules); + } } \ No newline at end of file