YARN-7838. Support AND/OR constraints in Distributed Shell. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2018-02-11 14:20:46 +08:00
parent 25fbec67d1
commit a08c048832
7 changed files with 1075 additions and 66 deletions

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util.constraint;
/**
* Exception when the placement constraint parser fails to parse an expression.
*/
public class PlacementConstraintParseException extends Exception {
public PlacementConstraintParseException(String msg) {
super(msg);
}
}

View File

@ -0,0 +1,615 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util.constraint;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import java.util.ArrayList;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.StringTokenizer;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.Set;
import java.util.TreeSet;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Placement constraint expression parser.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class PlacementConstraintParser {
private static final char EXPRESSION_DELIM = ':';
private static final char KV_SPLIT_DELIM = '=';
private static final char EXPRESSION_VAL_DELIM = ',';
private static final char BRACKET_START = '(';
private static final char BRACKET_END = ')';
private static final String IN = "in";
private static final String NOT_IN = "notin";
private static final String AND = "and";
private static final String OR = "or";
private static final String CARDINALITY = "cardinality";
private static final String SCOPE_NODE = PlacementConstraints.NODE;
private static final String SCOPE_RACK = PlacementConstraints.RACK;
private PlacementConstraintParser() {
// Private constructor for this utility class.
}
/**
* Constraint Parser used to parse placement constraints from a
* given expression.
*/
public static abstract class ConstraintParser {
private final ConstraintTokenizer tokenizer;
public ConstraintParser(ConstraintTokenizer tk){
this.tokenizer = tk;
}
void validate() throws PlacementConstraintParseException {
tokenizer.validate();
}
void shouldHaveNext()
throws PlacementConstraintParseException {
if (!tokenizer.hasMoreElements()) {
throw new PlacementConstraintParseException("Expecting more tokens");
}
}
String nextToken() {
return this.tokenizer.nextElement().trim();
}
boolean hasMoreTokens() {
return this.tokenizer.hasMoreElements();
}
int toInt(String name) throws PlacementConstraintParseException {
try {
return Integer.parseInt(name);
} catch (NumberFormatException e) {
throw new PlacementConstraintParseException(
"Expecting an Integer, but get " + name);
}
}
String parseScope(String scopeString)
throws PlacementConstraintParseException {
if (scopeString.equalsIgnoreCase(SCOPE_NODE)) {
return SCOPE_NODE;
} else if (scopeString.equalsIgnoreCase(SCOPE_RACK)) {
return SCOPE_RACK;
} else {
throw new PlacementConstraintParseException(
"expecting scope to " + SCOPE_NODE + " or " + SCOPE_RACK
+ ", but met " + scopeString);
}
}
public AbstractConstraint tryParse() {
try {
return parse();
} catch (PlacementConstraintParseException e) {
// unable to parse, simply return null
return null;
}
}
public abstract AbstractConstraint parse()
throws PlacementConstraintParseException;
}
/**
* Tokenizer interface that used to parse an expression. It first
* validates if the syntax of the given expression is valid, then traverse
* the expression and parse it to an enumeration of strings. Each parsed
* string can be further consumed by a {@link ConstraintParser} and
* transformed to a {@link AbstractConstraint}.
*/
public interface ConstraintTokenizer extends Enumeration<String> {
/**
* Validate the schema before actual parsing the expression.
* @throws PlacementConstraintParseException
*/
default void validate() throws PlacementConstraintParseException {
// do nothing
}
}
/**
* A basic tokenizer that splits an expression by a given delimiter.
*/
public static class BaseStringTokenizer implements ConstraintTokenizer {
private final StringTokenizer tokenizer;
BaseStringTokenizer(String expr, String delimiter) {
this.tokenizer = new StringTokenizer(expr, delimiter);
}
@Override
public boolean hasMoreElements() {
return tokenizer.hasMoreTokens();
}
@Override
public String nextElement() {
return tokenizer.nextToken();
}
}
/**
* Tokenizer used to parse conjunction form of a constraint expression,
* [AND|OR](C1:C2:...:Cn). Each Cn is a constraint expression.
*/
public static final class ConjunctionTokenizer
implements ConstraintTokenizer {
private final String expression;
private Iterator<String> iterator;
private ConjunctionTokenizer(String expr) {
this.expression = expr;
}
// Traverse the expression and try to get a list of parsed elements
// based on schema.
@Override
public void validate() throws PlacementConstraintParseException {
List<String> parsedElements = new ArrayList<>();
// expression should start with AND or OR
String op;
if (expression.startsWith(AND) ||
expression.startsWith(AND.toUpperCase())) {
op = AND;
} else if(expression.startsWith(OR) ||
expression.startsWith(OR.toUpperCase())) {
op = OR;
} else {
throw new PlacementConstraintParseException(
"Excepting starting with \"" + AND + "\" or \"" + OR + "\","
+ " but met " + expression);
}
parsedElements.add(op);
Pattern p = Pattern.compile("\\((.*)\\)");
Matcher m = p.matcher(expression);
if (!m.find()) {
throw new PlacementConstraintParseException("Unexpected format,"
+ " expecting [AND|OR](A:B...) "
+ "but current expression is " + expression);
}
String childStrs = m.group(1);
MultipleConstraintsTokenizer ct =
new MultipleConstraintsTokenizer(childStrs);
ct.validate();
while(ct.hasMoreElements()) {
parsedElements.add(ct.nextElement());
}
this.iterator = parsedElements.iterator();
}
@Override
public boolean hasMoreElements() {
return iterator.hasNext();
}
@Override
public String nextElement() {
return iterator.next();
}
}
/**
* Tokenizer used to parse allocation tags expression, which should be
* in tag=numOfAllocations syntax.
*/
public static class SourceTagsTokenizer implements ConstraintTokenizer {
private final String expression;
private StringTokenizer st;
private Iterator<String> iterator;
public SourceTagsTokenizer(String expr) {
this.expression = expr;
st = new StringTokenizer(expr, String.valueOf(KV_SPLIT_DELIM));
}
@Override
public void validate() throws PlacementConstraintParseException {
ArrayList<String> parsedValues = new ArrayList<>();
if (st.countTokens() != 2) {
throw new PlacementConstraintParseException(
"Expecting source allocation tag to be specified"
+ " sourceTag=numOfAllocations syntax,"
+ " but met " + expression);
}
String sourceTag = st.nextToken();
parsedValues.add(sourceTag);
String num = st.nextToken();
try {
Integer.parseInt(num);
parsedValues.add(num);
} catch (NumberFormatException e) {
throw new PlacementConstraintParseException("Value of the expression"
+ " must be an integer, but met " + num);
}
iterator = parsedValues.iterator();
}
@Override
public boolean hasMoreElements() {
return iterator.hasNext();
}
@Override
public String nextElement() {
return iterator.next();
}
}
/**
* Tokenizer used to handle a placement spec composed by multiple
* constraint expressions. Each of them is delimited with the
* given delimiter, e.g ':'.
*/
public static class MultipleConstraintsTokenizer
implements ConstraintTokenizer {
private final String expr;
private Iterator<String> iterator;
public MultipleConstraintsTokenizer(String expression) {
this.expr = expression;
}
@Override
public void validate() throws PlacementConstraintParseException {
ArrayList<String> parsedElements = new ArrayList<>();
char[] arr = expr.toCharArray();
// Memorize the location of each delimiter in a stack,
// removes invalid delimiters that embraced in brackets.
Stack<Integer> stack = new Stack<>();
for (int i=0; i<arr.length; i++) {
char current = arr[i];
switch (current) {
case EXPRESSION_DELIM:
stack.add(i);
break;
case BRACKET_START:
stack.add(i);
break;
case BRACKET_END:
while(!stack.isEmpty()) {
if (arr[stack.pop()] == BRACKET_START) {
break;
}
}
break;
default:
break;
}
}
if (stack.isEmpty()) {
// Single element
parsedElements.add(expr);
} else {
Iterator<Integer> it = stack.iterator();
int currentPos = 0;
while (it.hasNext()) {
int pos = it.next();
String sub = expr.substring(currentPos, pos);
if (sub != null && !sub.isEmpty()) {
parsedElements.add(sub);
}
currentPos = pos+1;
}
if (currentPos < expr.length()) {
parsedElements.add(expr.substring(currentPos, expr.length()));
}
}
iterator = parsedElements.iterator();
}
@Override
public boolean hasMoreElements() {
return iterator.hasNext();
}
@Override
public String nextElement() {
return iterator.next();
}
}
/**
* Constraint parser used to parse a given target expression, such as
* "NOTIN, NODE, foo, bar".
*/
public static class TargetConstraintParser extends ConstraintParser {
public TargetConstraintParser(String expression) {
super(new BaseStringTokenizer(expression,
String.valueOf(EXPRESSION_VAL_DELIM)));
}
@Override
public AbstractConstraint parse()
throws PlacementConstraintParseException {
PlacementConstraint.AbstractConstraint placementConstraints;
String op = nextToken();
if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
String scope = nextToken();
scope = parseScope(scope);
Set<String> allocationTags = new TreeSet<>();
while(hasMoreTokens()) {
String tag = nextToken();
allocationTags.add(tag);
}
PlacementConstraint.TargetExpression target =
PlacementConstraints.PlacementTargets.allocationTag(
allocationTags.toArray(new String[allocationTags.size()]));
if (op.equalsIgnoreCase(IN)) {
placementConstraints = PlacementConstraints
.targetIn(scope, target);
} else {
placementConstraints = PlacementConstraints
.targetNotIn(scope, target);
}
} else {
throw new PlacementConstraintParseException(
"expecting " + IN + " or " + NOT_IN + ", but get " + op);
}
return placementConstraints;
}
}
/**
* Constraint parser used to parse a given target expression, such as
* "cardinality, NODE, foo, 0, 1".
*/
public static class CardinalityConstraintParser extends ConstraintParser {
public CardinalityConstraintParser(String expr) {
super(new BaseStringTokenizer(expr,
String.valueOf(EXPRESSION_VAL_DELIM)));
}
@Override
public AbstractConstraint parse()
throws PlacementConstraintParseException {
String op = nextToken();
if (!op.equalsIgnoreCase(CARDINALITY)) {
throw new PlacementConstraintParseException("expecting " + CARDINALITY
+ " , but met " + op);
}
shouldHaveNext();
String scope = nextToken();
scope = parseScope(scope);
Stack<String> resetElements = new Stack<>();
while(hasMoreTokens()) {
resetElements.add(nextToken());
}
// At least 3 elements
if (resetElements.size() < 3) {
throw new PlacementConstraintParseException(
"Invalid syntax for a cardinality expression, expecting"
+ " \"cardinality,SCOPE,TARGET_TAG,...,TARGET_TAG,"
+ "MIN_CARDINALITY,MAX_CARDINALITY\" at least 5 elements,"
+ " but only " + (resetElements.size() + 2) + " is given.");
}
String maxCardinalityStr = resetElements.pop();
Integer max = toInt(maxCardinalityStr);
String minCardinalityStr = resetElements.pop();
Integer min = toInt(minCardinalityStr);
ArrayList<String> targetTags = new ArrayList<>();
while (!resetElements.empty()) {
targetTags.add(resetElements.pop());
}
return PlacementConstraints.cardinality(scope, min, max,
targetTags.toArray(new String[targetTags.size()]));
}
}
/**
* Parser used to parse conjunction form of constraints, such as
* AND(A, ..., B), OR(A, ..., B).
*/
public static class ConjunctionConstraintParser extends ConstraintParser {
public ConjunctionConstraintParser(String expr) {
super(new ConjunctionTokenizer(expr));
}
@Override
public AbstractConstraint parse() throws PlacementConstraintParseException {
// do pre-process, validate input.
validate();
String op = nextToken();
shouldHaveNext();
List<AbstractConstraint> constraints = new ArrayList<>();
while(hasMoreTokens()) {
// each child expression can be any valid form of
// constraint expressions.
String constraintStr = nextToken();
AbstractConstraint constraint = parseExpression(constraintStr);
constraints.add(constraint);
}
if (AND.equalsIgnoreCase(op)) {
return PlacementConstraints.and(
constraints.toArray(
new AbstractConstraint[constraints.size()]));
} else if (OR.equalsIgnoreCase(op)) {
return PlacementConstraints.or(
constraints.toArray(
new AbstractConstraint[constraints.size()]));
} else {
throw new PlacementConstraintParseException(
"Unexpected conjunction operator : " + op
+ ", expecting " + AND + " or " + OR);
}
}
}
/**
* A helper class to encapsulate source tags and allocations in the
* placement specification.
*/
public static final class SourceTags {
private String tag;
private int num;
private SourceTags(String sourceTag, int number) {
this.tag = sourceTag;
this.num = number;
}
public String getTag() {
return this.tag;
}
public int getNumOfAllocations() {
return this.num;
}
/**
* Parses source tags from expression "sourceTags=numOfAllocations".
* @param expr
* @return source tags, see {@link SourceTags}
* @throws PlacementConstraintParseException
*/
public static SourceTags parseFrom(String expr)
throws PlacementConstraintParseException {
SourceTagsTokenizer stt = new SourceTagsTokenizer(expr);
stt.validate();
// During validation we already checked the number of parsed elements.
String allocTag = stt.nextElement();
int allocNum = Integer.parseInt(stt.nextElement());
return new SourceTags(allocTag, allocNum);
}
}
/**
* Parses a given constraint expression to a {@link AbstractConstraint},
* this expression can be any valid form of constraint expressions.
*
* @param constraintStr expression string
* @return a parsed {@link AbstractConstraint}
* @throws PlacementConstraintParseException when given expression
* is malformed
*/
public static AbstractConstraint parseExpression(String constraintStr)
throws PlacementConstraintParseException {
// Try parse given expression with all allowed constraint parsers,
// fails if no one could parse it.
TargetConstraintParser tp = new TargetConstraintParser(constraintStr);
Optional<AbstractConstraint> constraintOptional =
Optional.ofNullable(tp.tryParse());
if (!constraintOptional.isPresent()) {
CardinalityConstraintParser cp =
new CardinalityConstraintParser(constraintStr);
constraintOptional = Optional.ofNullable(cp.tryParse());
if (!constraintOptional.isPresent()) {
ConjunctionConstraintParser jp =
new ConjunctionConstraintParser(constraintStr);
constraintOptional = Optional.ofNullable(jp.tryParse());
}
if (!constraintOptional.isPresent()) {
throw new PlacementConstraintParseException(
"Invalid constraint expression " + constraintStr);
}
}
return constraintOptional.get();
}
/**
* Parses a placement constraint specification. A placement constraint spec
* is a composite expression which is composed by multiple sub constraint
* expressions delimited by ":". With following syntax:
*
* <p>Tag1=N1,P1:Tag2=N2,P2:...:TagN=Nn,Pn</p>
*
* where <b>TagN=Nn</b> is a key value pair to determine the source
* allocation tag and the number of allocations, such as:
*
* <p>foo=3</p>
*
* and where <b>Pn</b> can be any form of a valid constraint expression,
* such as:
*
* <ul>
* <li>in,node,foo,bar</li>
* <li>notin,node,foo,bar,1,2</li>
* <li>and(notin,node,foo:notin,node,bar)</li>
* </ul>
* @param expression expression string.
* @return a map of source tags to placement constraint mapping.
* @throws PlacementConstraintParseException
*/
public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
String expression) throws PlacementConstraintParseException {
// Respect insertion order.
Map<SourceTags, PlacementConstraint> result = new LinkedHashMap<>();
PlacementConstraintParser.ConstraintTokenizer tokenizer =
new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
tokenizer.validate();
while(tokenizer.hasMoreElements()) {
String specStr = tokenizer.nextElement();
// each spec starts with sourceAllocationTag=numOfContainers and
// followed by a constraint expression.
// foo=4,Pn
String[] splitted = specStr.split(
String.valueOf(EXPRESSION_VAL_DELIM), 2);
if (splitted.length != 2) {
throw new PlacementConstraintParseException(
"Unexpected placement constraint expression " + specStr);
}
String tagAlloc = splitted[0];
SourceTags st = SourceTags.parseFrom(tagAlloc);
String exprs = splitted[1];
AbstractConstraint constraint =
PlacementConstraintParser.parseExpression(exprs);
result.put(st, constraint.build());
}
return result;
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.util.constraint contains classes
* which is used as utility class for placement constraints.
*/
package org.apache.hadoop.yarn.util.constraint;

View File

@ -0,0 +1,372 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.resource;
import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.TargetConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.CardinalityConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConjunctionConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.MultipleConstraintsTokenizer;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTagsTokenizer;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintTokenizer;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.*;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import org.junit.Assert;
import org.junit.Test;
/**
* Class to test placement constraint parser.
*/
public class TestPlacementConstraintParser {
@Test
public void testTargetExpressionParser()
throws PlacementConstraintParseException {
ConstraintParser parser;
AbstractConstraint constraint;
SingleConstraint single;
// Anti-affinity with single target tag
// NOTIN,NDOE,foo
parser = new TargetConstraintParser("NOTIN, NODE, foo");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof SingleConstraint);
single = (SingleConstraint) constraint;
Assert.assertEquals("node", single.getScope());
Assert.assertEquals(0, single.getMinCardinality());
Assert.assertEquals(0, single.getMaxCardinality());
// lower cases is also valid
parser = new TargetConstraintParser("notin, node, foo");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof SingleConstraint);
single = (SingleConstraint) constraint;
Assert.assertEquals("node", single.getScope());
Assert.assertEquals(0, single.getMinCardinality());
Assert.assertEquals(0, single.getMaxCardinality());
// Affinity with single target tag
// IN,NODE,foo
parser = new TargetConstraintParser("IN, NODE, foo");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof SingleConstraint);
single = (SingleConstraint) constraint;
Assert.assertEquals("node", single.getScope());
Assert.assertEquals(1, single.getMinCardinality());
Assert.assertEquals(Integer.MAX_VALUE, single.getMaxCardinality());
// Anti-affinity with multiple target tags
// NOTIN,NDOE,foo,bar,exp
parser = new TargetConstraintParser("NOTIN, NODE, foo, bar, exp");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof SingleConstraint);
single = (SingleConstraint) constraint;
Assert.assertEquals("node", single.getScope());
Assert.assertEquals(0, single.getMinCardinality());
Assert.assertEquals(0, single.getMaxCardinality());
Assert.assertEquals(1, single.getTargetExpressions().size());
TargetExpression exp =
single.getTargetExpressions().iterator().next();
Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
Assert.assertEquals(3, exp.getTargetValues().size());
// Invalid OP
parser = new TargetConstraintParser("XYZ, NODE, foo");
try {
parser.parse();
} catch (Exception e) {
Assert.assertTrue(e instanceof PlacementConstraintParseException);
Assert.assertTrue(e.getMessage().contains("expecting in or notin"));
}
}
@Test
public void testCardinalityConstraintParser()
throws PlacementConstraintParseException {
ConstraintParser parser;
AbstractConstraint constraint;
SingleConstraint single;
// cardinality,NODE,foo,0,1
parser = new CardinalityConstraintParser("cardinality, NODE, foo, 0, 1");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof SingleConstraint);
single = (SingleConstraint) constraint;
Assert.assertEquals("node", single.getScope());
Assert.assertEquals(0, single.getMinCardinality());
Assert.assertEquals(1, single.getMaxCardinality());
Assert.assertEquals(1, single.getTargetExpressions().size());
TargetExpression exp =
single.getTargetExpressions().iterator().next();
Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
Assert.assertEquals(1, exp.getTargetValues().size());
Assert.assertEquals("foo", exp.getTargetValues().iterator().next());
// cardinality,NODE,foo,bar,moo,0,1
parser = new CardinalityConstraintParser(
"cardinality,RACK,foo,bar,moo,0,1");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof SingleConstraint);
single = (SingleConstraint) constraint;
Assert.assertEquals("rack", single.getScope());
Assert.assertEquals(0, single.getMinCardinality());
Assert.assertEquals(1, single.getMaxCardinality());
Assert.assertEquals(1, single.getTargetExpressions().size());
exp = single.getTargetExpressions().iterator().next();
Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
Assert.assertEquals(3, exp.getTargetValues().size());
Set<String> expectedTags = Sets.newHashSet("foo", "bar", "moo");
Assert.assertTrue(Sets.difference(expectedTags, exp.getTargetValues())
.isEmpty());
// Invalid scope string
try {
parser = new CardinalityConstraintParser(
"cardinality,NOWHERE,foo,bar,moo,0,1");
parser.parse();
Assert.fail("Expecting a parsing failure!");
} catch (PlacementConstraintParseException e) {
Assert.assertTrue(e.getMessage()
.contains("expecting scope to node or rack, but met NOWHERE"));
}
// Invalid number of expression elements
try {
parser = new CardinalityConstraintParser(
"cardinality,NODE,0,1");
parser.parse();
Assert.fail("Expecting a parsing failure!");
} catch (PlacementConstraintParseException e) {
Assert.assertTrue(e.getMessage()
.contains("at least 5 elements, but only 4 is given"));
}
}
@Test
public void testAndConstraintParser()
throws PlacementConstraintParseException {
ConstraintParser parser;
AbstractConstraint constraint;
And and;
parser = new ConjunctionConstraintParser(
"AND(NOTIN,NODE,foo:NOTIN,NODE,bar)");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof And);
and = (And) constraint;
Assert.assertEquals(2, and.getChildren().size());
parser = new ConjunctionConstraintParser(
"AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1)");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof And);
Assert.assertEquals(2, and.getChildren().size());
parser = new ConjunctionConstraintParser(
"AND(NOTIN,NODE,foo:AND(NOTIN,NODE,foo:cardinality,NODE,foo,0,1))");
constraint = parser.parse();
Assert.assertTrue(constraint instanceof And);
and = (And) constraint;
Assert.assertTrue(and.getChildren().get(0) instanceof SingleConstraint);
Assert.assertTrue(and.getChildren().get(1) instanceof And);
and = (And) and.getChildren().get(1);
Assert.assertEquals(2, and.getChildren().size());
}
@Test
public void testMultipleConstraintsTokenizer()
throws PlacementConstraintParseException {
MultipleConstraintsTokenizer ct;
SourceTagsTokenizer st;
TokenizerTester mp;
ct = new MultipleConstraintsTokenizer(
"foo=1,A1,A2,A3:bar=2,B1,B2:moo=3,C1,C2");
mp = new TokenizerTester(ct,
"foo=1,A1,A2,A3", "bar=2,B1,B2", "moo=3,C1,C2");
mp.verify();
ct = new MultipleConstraintsTokenizer(
"foo=1,AND(A2:A3):bar=2,OR(B1:AND(B2:B3)):moo=3,C1,C2");
mp = new TokenizerTester(ct,
"foo=1,AND(A2:A3)", "bar=2,OR(B1:AND(B2:B3))", "moo=3,C1,C2");
mp.verify();
ct = new MultipleConstraintsTokenizer("A:B:C");
mp = new TokenizerTester(ct, "A", "B", "C");
mp.verify();
ct = new MultipleConstraintsTokenizer("A:AND(B:C):D");
mp = new TokenizerTester(ct, "A", "AND(B:C)", "D");
mp.verify();
ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E");
mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E");
mp.verify();
ct = new MultipleConstraintsTokenizer("A:AND(B:OR(C:D)):E");
mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E");
mp.verify();
st = new SourceTagsTokenizer("A=4");
mp = new TokenizerTester(st, "A", "4");
mp.verify();
try {
st = new SourceTagsTokenizer("A=B");
mp = new TokenizerTester(st, "A", "B");
mp.verify();
Assert.fail("Expecting a parsing failure");
} catch (PlacementConstraintParseException e) {
Assert.assertTrue(e.getMessage()
.contains("Value of the expression must be an integer"));
}
}
private static class TokenizerTester {
private ConstraintTokenizer tokenizer;
private String[] expectedExtractions;
protected TokenizerTester(ConstraintTokenizer tk,
String... expctedStrings) {
this.tokenizer = tk;
this.expectedExtractions = expctedStrings;
}
void verify()
throws PlacementConstraintParseException {
tokenizer.validate();
int i = 0;
while (tokenizer.hasMoreElements()) {
String current = tokenizer.nextElement();
Assert.assertTrue(i < expectedExtractions.length);
Assert.assertEquals(expectedExtractions[i], current);
i++;
}
}
}
@Test
public void testParsePlacementSpec()
throws PlacementConstraintParseException {
Map<SourceTags, PlacementConstraint> result;
PlacementConstraint expectedPc1, expectedPc2;
PlacementConstraint actualPc1, actualPc2;
SourceTags tag1, tag2;
// A single anti-affinity constraint
result = PlacementConstraintParser
.parsePlacementSpec("foo=3,notin,node,foo");
Assert.assertEquals(1, result.size());
tag1 = result.keySet().iterator().next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(3, tag1.getNumOfAllocations());
expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1);
// Upper case
result = PlacementConstraintParser
.parsePlacementSpec("foo=3,NOTIN,NODE,foo");
Assert.assertEquals(1, result.size());
tag1 = result.keySet().iterator().next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(3, tag1.getNumOfAllocations());
expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1);
// A single cardinality constraint
result = PlacementConstraintParser
.parsePlacementSpec("foo=10,cardinality,node,foo,bar,0,100");
Assert.assertEquals(1, result.size());
tag1 = result.keySet().iterator().next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(10, tag1.getNumOfAllocations());
expectedPc1 = cardinality("node", 0, 100, "foo", "bar").build();
Assert.assertEquals(expectedPc1, result.values().iterator().next());
// Two constraint expressions
result = PlacementConstraintParser
.parsePlacementSpec("foo=3,notin,node,foo:bar=2,in,node,foo");
Assert.assertEquals(2, result.size());
Iterator<SourceTags> keyIt = result.keySet().iterator();
tag1 = keyIt.next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(3, tag1.getNumOfAllocations());
tag2 = keyIt.next();
Assert.assertEquals("bar", tag2.getTag());
Assert.assertEquals(2, tag2.getNumOfAllocations());
Iterator<PlacementConstraint> valueIt = result.values().iterator();
expectedPc1 = targetNotIn("node", allocationTag("foo")).build();
expectedPc2 = targetIn("node", allocationTag("foo")).build();
Assert.assertEquals(expectedPc1, valueIt.next());
Assert.assertEquals(expectedPc2, valueIt.next());
// And constraint
result = PlacementConstraintParser
.parsePlacementSpec("foo=1000,and(notin,node,bar:in,node,foo)");
Assert.assertEquals(1, result.size());
keyIt = result.keySet().iterator();
tag1 = keyIt.next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(1000, tag1.getNumOfAllocations());
actualPc1 = result.values().iterator().next();
expectedPc1 = and(targetNotIn("node", allocationTag("bar")),
targetIn("node", allocationTag("foo"))).build();
Assert.assertEquals(expectedPc1, actualPc1);
// Multiple constraints with nested forms.
result = PlacementConstraintParser.parsePlacementSpec(
"foo=1000,and(notin,node,bar:or(in,node,foo:in,node,moo))"
+ ":bar=200,notin,node,foo");
Assert.assertEquals(2, result.size());
keyIt = result.keySet().iterator();
tag1 = keyIt.next();
tag2 = keyIt.next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(1000, tag1.getNumOfAllocations());
Assert.assertEquals("bar", tag2.getTag());
Assert.assertEquals(200, tag2.getNumOfAllocations());
valueIt = result.values().iterator();
actualPc1 = valueIt.next();
actualPc2 = valueIt.next();
expectedPc1 = and(targetNotIn("node", allocationTag("bar")),
or(targetIn("node", allocationTag("foo")),
targetIn("node", allocationTag("moo")))).build();
Assert.assertEquals(actualPc1, expectedPc1);
expectedPc2 = targetNotIn("node", allocationTag("foo")).build();
Assert.assertEquals(expectedPc2, actualPc2);
}
}

View File

@ -28,6 +28,7 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -38,6 +39,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.Base64;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -671,8 +673,14 @@ public class ApplicationMaster {
} }
private void parsePlacementSpecs(String placementSpecifications) { private void parsePlacementSpecs(String placementSpecifications) {
// Client sends placement spec in encoded format
Base64.Decoder decoder = Base64.getDecoder();
byte[] decodedBytes = decoder.decode(
placementSpecifications.getBytes(StandardCharsets.UTF_8));
String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
LOG.info("Decode placement spec: " + decodedSpec);
Map<String, PlacementSpec> pSpecs = Map<String, PlacementSpec> pSpecs =
PlacementSpec.parse(placementSpecifications); PlacementSpec.parse(decodedSpec);
this.placementSpecs = new HashMap<>(); this.placementSpecs = new HashMap<>();
this.numTotalContainers = 0; this.numTotalContainers = 0;
for (PlacementSpec pSpec : pSpecs.values()) { for (PlacementSpec pSpec : pSpecs.values()) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -28,6 +29,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Vector; import java.util.Vector;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
@ -857,7 +859,11 @@ public class Client {
} }
vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--num_containers " + String.valueOf(numContainers));
if (placementSpec != null && placementSpec.length() > 0) { if (placementSpec != null && placementSpec.length() > 0) {
vargs.add("--placement_spec " + placementSpec); // Encode the spec to avoid passing special chars via shell arguments.
String encodedSpec = Base64.getEncoder()
.encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8));
LOG.info("Encode placement spec: " + encodedSpec);
vargs.add("--placement_spec " + encodedSpec);
} }
if (null != nodeLabelExpression) { if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression); appContext.setNodeLabelExpression(nodeLabelExpression);

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.yarn.applications.distributedshell; package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTags;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Scanner;
/** /**
* Class encapsulating a SourceTag, number of container and a Placement * Class encapsulating a SourceTag, number of container and a Placement
@ -34,12 +35,6 @@ public class PlacementSpec {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(PlacementSpec.class); LoggerFactory.getLogger(PlacementSpec.class);
private static final String SPEC_DELIM = ":";
private static final String KV_SPLIT_DELIM = "=";
private static final String SPEC_VAL_DELIM = ",";
private static final String IN = "in";
private static final String NOT_IN = "notin";
private static final String CARDINALITY = "cardinality";
public final String sourceTag; public final String sourceTag;
public final int numContainers; public final int numContainers;
@ -73,65 +68,28 @@ public class PlacementSpec {
* @param specs Placement spec. * @param specs Placement spec.
* @return Mapping from source tag to placement constraint. * @return Mapping from source tag to placement constraint.
*/ */
public static Map<String, PlacementSpec> parse(String specs) { public static Map<String, PlacementSpec> parse(String specs)
throws IllegalArgumentException {
LOG.info("Parsing Placement Specs: [{}]", specs); LOG.info("Parsing Placement Specs: [{}]", specs);
Scanner s = new Scanner(specs).useDelimiter(SPEC_DELIM);
Map<String, PlacementSpec> pSpecs = new HashMap<>(); Map<String, PlacementSpec> pSpecs = new HashMap<>();
while (s.hasNext()) { Map<SourceTags, PlacementConstraint> parsed;
String sp = s.next(); try {
LOG.info("Parsing Spec: [{}]", sp); parsed = PlacementConstraintParser.parsePlacementSpec(specs);
String[] specSplit = sp.split(KV_SPLIT_DELIM); for (Map.Entry<SourceTags, PlacementConstraint> entry :
String sourceTag = specSplit[0]; parsed.entrySet()) {
Scanner ps = new Scanner(specSplit[1]).useDelimiter(SPEC_VAL_DELIM); LOG.info("Parsed source tag: {}, number of allocations: {}",
int numContainers = ps.nextInt(); entry.getKey().getTag(), entry.getKey().getNumOfAllocations());
if (!ps.hasNext()) { LOG.info("Parsed constraint: {}", entry.getValue()
pSpecs.put(sourceTag, .getConstraintExpr().getClass().getSimpleName());
new PlacementSpec(sourceTag, numContainers, null)); pSpecs.put(entry.getKey().getTag(), new PlacementSpec(
LOG.info("Creating Spec without constraint {}: num[{}]", entry.getKey().getTag(),
sourceTag, numContainers); entry.getKey().getNumOfAllocations(),
continue; entry.getValue()));
} }
String cType = ps.next().toLowerCase(); return pSpecs;
String scope = ps.next().toLowerCase(); } catch (PlacementConstraintParseException e) {
throw new IllegalArgumentException(
String targetTag = ps.next(); "Invalid placement spec: " + specs, e);
scope = scope.equals("rack") ? PlacementConstraints.RACK :
PlacementConstraints.NODE;
PlacementConstraint pc;
if (cType.equals(IN)) {
pc = PlacementConstraints.build(
PlacementConstraints.targetIn(scope,
PlacementConstraints.PlacementTargets.allocationTag(
targetTag)));
LOG.info("Creating IN Constraint for source tag [{}], num[{}]: " +
"scope[{}], target[{}]",
sourceTag, numContainers, scope, targetTag);
} else if (cType.equals(NOT_IN)) {
pc = PlacementConstraints.build(
PlacementConstraints.targetNotIn(scope,
PlacementConstraints.PlacementTargets.allocationTag(
targetTag)));
LOG.info("Creating NOT_IN Constraint for source tag [{}], num[{}]: " +
"scope[{}], target[{}]",
sourceTag, numContainers, scope, targetTag);
} else if (cType.equals(CARDINALITY)) {
int minCard = ps.nextInt();
int maxCard = ps.nextInt();
pc = PlacementConstraints.build(
PlacementConstraints.targetCardinality(scope, minCard, maxCard,
PlacementConstraints.PlacementTargets.allocationTag(
targetTag)));
LOG.info("Creating CARDINALITY Constraint source tag [{}], num[{}]: " +
"scope[{}], min[{}], max[{}], target[{}]",
sourceTag, numContainers, scope, minCard, maxCard, targetTag);
} else {
throw new RuntimeException(
"Could not parse constraintType [" + cType + "]" +
" in [" + specSplit[1] + "]");
}
pSpecs.put(sourceTag, new PlacementSpec(sourceTag, numContainers, pc));
} }
return pSpecs;
} }
} }