diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java index 93fd706b0c3..de9419ae619 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.util.constraint; +import com.google.common.base.Strings; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; @@ -589,6 +590,14 @@ public final class PlacementConstraintParser { this.num = number; } + public static SourceTags emptySourceTags() { + return new SourceTags("", 0); + } + + public boolean isEmpty() { + return Strings.isNullOrEmpty(tag) && num == 0; + } + public String getTag() { return this.tag; } @@ -692,20 +701,47 @@ public final class PlacementConstraintParser { // foo=4,Pn String[] splitted = specStr.split( String.valueOf(EXPRESSION_VAL_DELIM), 2); - if (splitted.length != 2) { + final SourceTags st; + final String exprs; + if (splitted.length == 1) { + // source tags not specified + exprs = splitted[0]; + st = SourceTags.emptySourceTags(); + } else if (splitted.length == 2) { + exprs = splitted[1]; + String tagAlloc = splitted[0]; + st = SourceTags.parseFrom(tagAlloc); + } else { throw new PlacementConstraintParseException( "Unexpected placement constraint expression " + specStr); } - String tagAlloc = splitted[0]; - SourceTags st = SourceTags.parseFrom(tagAlloc); - String exprs = splitted[1]; AbstractConstraint constraint = PlacementConstraintParser.parseExpression(exprs); result.put(st, constraint.build()); } + // Validation + Set sourceTagSet = result.keySet(); + if (sourceTagSet.stream() + .filter(sourceTags -> sourceTags.isEmpty()) + .findAny() + .isPresent()) { + // Source tags, e.g foo=3, is optional for a node-attribute constraint, + // but when source tags is absent, the parser only accept single + // constraint expression to avoid ambiguous semantic. This is because + // DS AM is requesting number of containers per the number specified + // in the source tags, we do overwrite when there is no source tags + // with num_containers argument from commandline. If that is partially + // missed in the constraints, we don't know if it is ought to + // overwritten or not. + if (result.size() != 1) { + throw new PlacementConstraintParseException( + "Source allocation tags is required for a multi placement" + + " constraint expression."); + } + } return result; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java index 9806ba4ac96..91e4fdb4e84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java @@ -501,5 +501,27 @@ public class TestPlacementConstraintParser { actualPc2 = valueIt.next(); Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr()); + + // A single node attribute constraint w/o source tags + result = PlacementConstraintParser + .parsePlacementSpec("rm.yarn.io/foo=true"); + Assert.assertEquals(1, result.size()); + target = PlacementTargets.nodeAttribute("rm.yarn.io/foo", "true"); + expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target); + + SourceTags actualSourceTags = result.keySet().iterator().next(); + Assert.assertTrue(actualSourceTags.isEmpty()); + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + // If source tags is not specified for a node-attribute constraint, + // then this expression must be single constraint expression. + try { + PlacementConstraintParser + .parsePlacementSpec("rm.yarn.io/foo=true:xyz=1,notin,node,xyz"); + Assert.fail("Expected a failure!"); + } catch (Exception e) { + Assert.assertTrue(e instanceof PlacementConstraintParseException); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 09a796e4e63..a04f57b2704 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Strings; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -527,7 +528,9 @@ public class ApplicationMaster { LOG.info("Placement Spec received [{}]", decodedSpec); this.numTotalContainers = 0; - parsePlacementSpecs(decodedSpec); + int globalNumOfContainers = Integer + .parseInt(cliParser.getOptionValue("num_containers", "0")); + parsePlacementSpecs(decodedSpec, globalNumOfContainers); LOG.info("Total num containers requested [{}]", numTotalContainers); if (numTotalContainers == 0) { @@ -698,11 +701,19 @@ public class ApplicationMaster { return true; } - private void parsePlacementSpecs(String decodedSpec) { + private void parsePlacementSpecs(String decodedSpec, + int globalNumOfContainers) { Map pSpecs = PlacementSpec.parse(decodedSpec); this.placementSpecs = new HashMap<>(); for (PlacementSpec pSpec : pSpecs.values()) { + // Use global num of containers when the spec doesn't specify + // source tags. This is allowed when using node-attribute constraints. + if (Strings.isNullOrEmpty(pSpec.sourceTag) + && pSpec.getNumContainers() == 0 + && globalNumOfContainers > 0) { + pSpec.setNumContainers(globalNumOfContainers); + } this.numTotalContainers += pSpec.getNumContainers(); this.placementSpecs.put(pSpec.sourceTag, pSpec); } @@ -799,8 +810,9 @@ public class ApplicationMaster { placementConstraintMap = new HashMap<>(); for (PlacementSpec spec : this.placementSpecs.values()) { if (spec.constraint != null) { - placementConstraintMap.put( - Collections.singleton(spec.sourceTag), spec.constraint); + Set allocationTags = Strings.isNullOrEmpty(spec.sourceTag) ? + Collections.emptySet() : Collections.singleton(spec.sourceTag); + placementConstraintMap.put(allocationTags, spec.constraint); } } }