YARN-7863. Modify placement constraints to support node attributes. Contributed by Sunil Govindan.

This commit is contained in:
Naganarasimha 2018-08-27 10:27:33 +08:00 committed by Sunil G
parent 8c947398ab
commit 67ae81f0e0
19 changed files with 568 additions and 46 deletions

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* Enumeration of various node attribute op codes.
*/
@Public
@Evolving
public enum NodeAttributeOpCode {
/**
* Default as No OP.
*/
NO_OP,
/**
* EQUALS op code for Attribute.
*/
EQ,
/**
* NOT EQUALS op code for Attribute.
*/
NE
}

View File

@ -29,6 +29,7 @@ import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
/**
* {@code PlacementConstraint} represents a placement constraint for a resource
@ -155,13 +156,22 @@ public class PlacementConstraint {
private int minCardinality;
private int maxCardinality;
private Set<TargetExpression> targetExpressions;
private NodeAttributeOpCode attributeOpCode;
public SingleConstraint(String scope, int minCardinality,
int maxCardinality, Set<TargetExpression> targetExpressions) {
int maxCardinality, NodeAttributeOpCode opCode,
Set<TargetExpression> targetExpressions) {
this.scope = scope;
this.minCardinality = minCardinality;
this.maxCardinality = maxCardinality;
this.targetExpressions = targetExpressions;
this.attributeOpCode = opCode;
}
public SingleConstraint(String scope, int minCardinality,
int maxCardinality, Set<TargetExpression> targetExpressions) {
this(scope, minCardinality, maxCardinality, NodeAttributeOpCode.NO_OP,
targetExpressions);
}
public SingleConstraint(String scope, int minC, int maxC,
@ -169,6 +179,13 @@ public class PlacementConstraint {
this(scope, minC, maxC, new HashSet<>(Arrays.asList(targetExpressions)));
}
public SingleConstraint(String scope, int minC, int maxC,
NodeAttributeOpCode opCode,
TargetExpression... targetExpressions) {
this(scope, minC, maxC, opCode,
new HashSet<>(Arrays.asList(targetExpressions)));
}
/**
* Get the scope of the constraint.
*
@ -205,6 +222,15 @@ public class PlacementConstraint {
return targetExpressions;
}
/**
* Get the NodeAttributeOpCode of the constraint.
*
* @return nodeAttribute Op Code
*/
public NodeAttributeOpCode getNodeAttributeOpCode() {
return attributeOpCode;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -225,6 +251,10 @@ public class PlacementConstraint {
if (!getScope().equals(that.getScope())) {
return false;
}
if (getNodeAttributeOpCode() != null && !getNodeAttributeOpCode()
.equals(that.getNodeAttributeOpCode())) {
return false;
}
return getTargetExpressions().equals(that.getTargetExpressions());
}
@ -233,6 +263,7 @@ public class PlacementConstraint {
int result = getScope().hashCode();
result = 31 * result + getMinCardinality();
result = 31 * result + getMaxCardinality();
result = 31 * result + getNodeAttributeOpCode().hashCode();
result = 31 * result + getTargetExpressions().hashCode();
return result;
}
@ -259,6 +290,13 @@ public class PlacementConstraint {
.append(getScope()).append(",")
.append(targetExpr)
.toString());
} else if (min == -1 && max == -1) {
// node attribute
targetConstraints.add(new StringBuilder()
.append(getScope()).append(",")
.append(getNodeAttributeOpCode()).append(",")
.append(targetExpr)
.toString());
} else {
// cardinality
targetConstraints.add(new StringBuilder()

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@ -85,6 +86,24 @@ public final class PlacementConstraints {
return new SingleConstraint(scope, 0, 0, targetExpressions);
}
/**
* Creates a constraint that requires allocations to be placed on nodes that
* belong to a scope (e.g., node or rack) that satisfy any of the
* target expressions based on node attribute op code.
*
* @param scope the scope within which the target expressions should not be
* true
* @param opCode Node Attribute code which could be equals, not equals.
* @param targetExpressions the expressions that need to not be true within
* the scope
* @return the resulting placement constraint
*/
public static AbstractConstraint targetNodeAttribute(String scope,
NodeAttributeOpCode opCode,
TargetExpression... targetExpressions) {
return new SingleConstraint(scope, -1, -1, opCode, targetExpressions);
}
/**
* Creates a constraint that restricts the number of allocations within a
* given scope (e.g., node or rack).

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.util.constraint;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
@ -44,11 +45,12 @@ import java.util.regex.Pattern;
@InterfaceStability.Unstable
public final class PlacementConstraintParser {
public static final char EXPRESSION_VAL_DELIM = ',';
private static final char EXPRESSION_DELIM = ':';
private static final char KV_SPLIT_DELIM = '=';
private static final char EXPRESSION_VAL_DELIM = ',';
private static final char BRACKET_START = '(';
private static final char BRACKET_END = ')';
private static final String KV_NE_DELIM = "!=";
private static final String IN = "in";
private static final String NOT_IN = "notin";
private static final String AND = "and";
@ -349,6 +351,91 @@ public final class PlacementConstraintParser {
}
}
/**
* Constraint parser used to parse a given target expression.
*/
public static class NodeConstraintParser extends ConstraintParser {
public NodeConstraintParser(String expression) {
super(new BaseStringTokenizer(expression,
String.valueOf(EXPRESSION_VAL_DELIM)));
}
@Override
public AbstractConstraint parse()
throws PlacementConstraintParseException {
PlacementConstraint.AbstractConstraint placementConstraints = null;
String attributeName = "";
NodeAttributeOpCode opCode = NodeAttributeOpCode.EQ;
String scope = SCOPE_NODE;
Set<String> constraintEntities = new TreeSet<>();
while (hasMoreTokens()) {
String currentTag = nextToken();
StringTokenizer attributeKV = getAttributeOpCodeTokenizer(currentTag);
// Usually there will be only one k=v pair. However in case when
// multiple values are present for same attribute, it will also be
// coming as next token. for example, java=1.8,1.9 or python!=2.
if (attributeKV.countTokens() > 1) {
opCode = getAttributeOpCode(currentTag);
attributeName = attributeKV.nextToken();
currentTag = attributeKV.nextToken();
}
constraintEntities.add(currentTag);
}
if(attributeName.isEmpty()) {
throw new PlacementConstraintParseException(
"expecting valid expression like k=v or k!=v, but get "
+ constraintEntities);
}
PlacementConstraint.TargetExpression target = null;
if (!constraintEntities.isEmpty()) {
target = PlacementConstraints.PlacementTargets
.nodeAttribute(attributeName,
constraintEntities
.toArray(new String[constraintEntities.size()]));
}
placementConstraints = PlacementConstraints
.targetNodeAttribute(scope, opCode, target);
return placementConstraints;
}
private StringTokenizer getAttributeOpCodeTokenizer(String currentTag) {
StringTokenizer attributeKV = new StringTokenizer(currentTag,
KV_NE_DELIM);
// Try with '!=' delim as well.
if (attributeKV.countTokens() < 2) {
attributeKV = new StringTokenizer(currentTag,
String.valueOf(KV_SPLIT_DELIM));
}
return attributeKV;
}
/**
* Below conditions are validated.
* java=8 : OpCode = EQUALS
* java!=8 : OpCode = NEQUALS
* @param currentTag tag
* @return Attribute op code.
*/
private NodeAttributeOpCode getAttributeOpCode(String currentTag)
throws PlacementConstraintParseException {
if (currentTag.contains(KV_NE_DELIM)) {
return NodeAttributeOpCode.NE;
} else if (currentTag.contains(String.valueOf(KV_SPLIT_DELIM))) {
return NodeAttributeOpCode.EQ;
}
throw new PlacementConstraintParseException(
"expecting valid expression like k=v or k!=v, but get "
+ currentTag);
}
}
/**
* Constraint parser used to parse a given target expression, such as
* "NOTIN, NODE, foo, bar".
@ -363,20 +450,23 @@ public final class PlacementConstraintParser {
@Override
public AbstractConstraint parse()
throws PlacementConstraintParseException {
PlacementConstraint.AbstractConstraint placementConstraints;
PlacementConstraint.AbstractConstraint placementConstraints = null;
String op = nextToken();
if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
String scope = nextToken();
scope = parseScope(scope);
Set<String> allocationTags = new TreeSet<>();
Set<String> constraintEntities = new TreeSet<>();
while(hasMoreTokens()) {
String tag = nextToken();
allocationTags.add(tag);
constraintEntities.add(tag);
}
PlacementConstraint.TargetExpression target = null;
if(!constraintEntities.isEmpty()) {
target = PlacementConstraints.PlacementTargets.allocationTag(
constraintEntities
.toArray(new String[constraintEntities.size()]));
}
PlacementConstraint.TargetExpression target =
PlacementConstraints.PlacementTargets.allocationTag(
allocationTags.toArray(new String[allocationTags.size()]));
if (op.equalsIgnoreCase(IN)) {
placementConstraints = PlacementConstraints
.targetIn(scope, target);
@ -550,6 +640,11 @@ public final class PlacementConstraintParser {
new ConjunctionConstraintParser(constraintStr);
constraintOptional = Optional.ofNullable(jp.tryParse());
}
if (!constraintOptional.isPresent()) {
NodeConstraintParser np =
new NodeConstraintParser(constraintStr);
constraintOptional = Optional.ofNullable(np.tryParse());
}
if (!constraintOptional.isPresent()) {
throw new PlacementConstraintParseException(
"Invalid constraint expression " + constraintStr);
@ -584,12 +679,13 @@ public final class PlacementConstraintParser {
*/
public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
String expression) throws PlacementConstraintParseException {
// Continue handling for application tag based constraint otherwise.
// Respect insertion order.
Map<SourceTags, PlacementConstraint> result = new LinkedHashMap<>();
PlacementConstraintParser.ConstraintTokenizer tokenizer =
new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
tokenizer.validate();
while(tokenizer.hasMoreElements()) {
while (tokenizer.hasMoreElements()) {
String specStr = tokenizer.nextElement();
// each spec starts with sourceAllocationTag=numOfContainers and
// followed by a constraint expression.

View File

@ -646,11 +646,18 @@ message PlacementConstraintProto {
optional CompositePlacementConstraintProto compositeConstraint = 2;
}
enum NodeAttributeOpCodeProto {
NO_OP = 1;
EQ = 2;
NE = 3;
}
message SimplePlacementConstraintProto {
required string scope = 1;
repeated PlacementConstraintTargetProto targetExpressions = 2;
optional int32 minCardinality = 3;
optional int32 maxCardinality = 4;
optional NodeAttributeOpCodeProto attributeOpCode = 5;
}
message PlacementConstraintTargetProto {

View File

@ -22,6 +22,8 @@ import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or;
@ -38,8 +40,14 @@ import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.Multiple
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTagsTokenizer;
import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintTokenizer;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.*;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNodeAttribute;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
import org.junit.Assert;
import org.junit.Test;
@ -443,4 +451,55 @@ public class TestPlacementConstraintParser {
+ constrainExpr + ", caused by: " + e.getMessage());
}
}
@Test
public void testParseNodeAttributeSpec()
throws PlacementConstraintParseException {
Map<SourceTags, PlacementConstraint> result;
PlacementConstraint.AbstractConstraint expectedPc1, expectedPc2;
PlacementConstraint actualPc1, actualPc2;
// A single node attribute constraint
result = PlacementConstraintParser
.parsePlacementSpec("xyz=4,rm.yarn.io/foo=true");
Assert.assertEquals(1, result.size());
TargetExpression target = PlacementTargets
.nodeAttribute("rm.yarn.io/foo", "true");
expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
// A single node attribute constraint
result = PlacementConstraintParser
.parsePlacementSpec("xyz=3,rm.yarn.io/foo!=abc");
Assert.assertEquals(1, result.size());
target = PlacementTargets
.nodeAttribute("rm.yarn.io/foo", "abc");
expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target);
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
// A single node attribute constraint
result = PlacementConstraintParser
.parsePlacementSpec(
"xyz=1,rm.yarn.io/foo!=abc:zxy=1,rm.yarn.io/bar=true");
Assert.assertEquals(2, result.size());
target = PlacementTargets
.nodeAttribute("rm.yarn.io/foo", "abc");
expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target);
target = PlacementTargets
.nodeAttribute("rm.yarn.io/bar", "true");
expectedPc2 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
Iterator<PlacementConstraint> valueIt = result.values().iterator();
actualPc1 = valueIt.next();
actualPc2 = valueIt.next();
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr());
}
}

View File

@ -523,9 +523,13 @@ public class ApplicationMaster {
if (cliParser.hasOption("placement_spec")) {
String placementSpec = cliParser.getOptionValue("placement_spec");
LOG.info("Placement Spec received [{}]", placementSpec);
parsePlacementSpecs(placementSpec);
String decodedSpec = getDecodedPlacementSpec(placementSpec);
LOG.info("Placement Spec received [{}]", decodedSpec);
this.numTotalContainers = 0;
parsePlacementSpecs(decodedSpec);
LOG.info("Total num containers requested [{}]", numTotalContainers);
if (numTotalContainers == 0) {
throw new IllegalArgumentException(
"Cannot run distributed shell with no containers");
@ -694,21 +698,23 @@ public class ApplicationMaster {
return true;
}
private void parsePlacementSpecs(String placementSpecifications) {
// Client sends placement spec in encoded format
private void parsePlacementSpecs(String decodedSpec) {
Map<String, PlacementSpec> pSpecs =
PlacementSpec.parse(decodedSpec);
this.placementSpecs = new HashMap<>();
for (PlacementSpec pSpec : pSpecs.values()) {
this.numTotalContainers += pSpec.getNumContainers();
this.placementSpecs.put(pSpec.sourceTag, pSpec);
}
}
private String getDecodedPlacementSpec(String placementSpecifications) {
Base64.Decoder decoder = Base64.getDecoder();
byte[] decodedBytes = decoder.decode(
placementSpecifications.getBytes(StandardCharsets.UTF_8));
String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
LOG.info("Decode placement spec: " + decodedSpec);
Map<String, PlacementSpec> pSpecs =
PlacementSpec.parse(decodedSpec);
this.placementSpecs = new HashMap<>();
this.numTotalContainers = 0;
for (PlacementSpec pSpec : pSpecs.values()) {
this.numTotalContainers += pSpec.numContainers;
this.placementSpecs.put(pSpec.sourceTag, pSpec);
}
return decodedSpec;
}
/**
@ -798,6 +804,7 @@ public class ApplicationMaster {
}
}
}
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl, placementConstraintMap);
@ -845,14 +852,18 @@ public class ApplicationMaster {
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
if (this.placementSpecs == null) {
LOG.info("placementSpecs null");
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
} else {
LOG.info("placementSpecs to create req:" + placementSpecs);
List<SchedulingRequest> schedReqs = new ArrayList<>();
for (PlacementSpec pSpec : this.placementSpecs.values()) {
for (int i = 0; i < pSpec.numContainers; i++) {
LOG.info("placementSpec :" + pSpec + ", container:" + pSpec
.getNumContainers());
for (int i = 0; i < pSpec.getNumContainers(); i++) {
SchedulingRequest sr = setupSchedulingRequest(pSpec);
schedReqs.add(sr);
}

View File

@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory;
* the provided shell command on a set of containers. </p>
*
* <p>This client is meant to act as an example on how to write yarn-based applications. </p>
*
*
* <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code>
* aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol}
* provides a way for the client to get access to cluster information and to request for a
@ -192,6 +192,8 @@ public class Client {
// Placement specification
private String placementSpec = "";
// Node Attribute specification
private String nodeAttributeSpec = "";
// log4j.properties file
// if available, add to local resources and set into classpath
private String log4jPropFile = "";
@ -448,6 +450,7 @@ public class Client {
// Check if it is parsable
PlacementSpec.parse(this.placementSpec);
}
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");

View File

@ -37,8 +37,8 @@ public class PlacementSpec {
LoggerFactory.getLogger(PlacementSpec.class);
public final String sourceTag;
public final int numContainers;
public final PlacementConstraint constraint;
private int numContainers;
public PlacementSpec(String sourceTag, int numContainers,
PlacementConstraint constraint) {
@ -47,6 +47,22 @@ public class PlacementSpec {
this.constraint = constraint;
}
/**
* Get the number of container for this spec.
* @return container count
*/
public int getNumContainers() {
return numContainers;
}
/**
* Set number of containers for this spec.
* @param numContainers number of containers.
*/
public void setNumContainers(int numContainers) {
this.numContainers = numContainers;
}
// Placement specification should be of the form:
// PlacementSpec => ""|KeyVal;PlacementSpec
// KeyVal => SourceTag=Constraint
@ -71,6 +87,7 @@ public class PlacementSpec {
public static Map<String, PlacementSpec> parse(String specs)
throws IllegalArgumentException {
LOG.info("Parsing Placement Specs: [{}]", specs);
Map<String, PlacementSpec> pSpecs = new HashMap<>();
Map<SourceTags, PlacementConstraint> parsed;
try {

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto;
@ -73,7 +75,8 @@ public class PlacementConstraintFromProtoConverter {
}
return new SingleConstraint(proto.getScope(), proto.getMinCardinality(),
proto.getMaxCardinality(), targets);
proto.getMaxCardinality(),
convertFromProtoFormat(proto.getAttributeOpCode()), targets);
}
private TargetExpression convert(PlacementConstraintTargetProto proto) {
@ -113,4 +116,9 @@ public class PlacementConstraintFromProtoConverter {
return new TimedPlacementConstraint(pConstraint, proto.getSchedulingDelay(),
ProtoUtils.convertFromProtoFormat(proto.getDelayUnit()));
}
private static NodeAttributeOpCode convertFromProtoFormat(
NodeAttributeOpCodeProto p) {
return NodeAttributeOpCode.valueOf(p.name());
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementCon
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto.CompositeType;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto;
@ -72,6 +74,10 @@ public class PlacementConstraintToProtoConverter
}
sb.setMinCardinality(constraint.getMinCardinality());
sb.setMaxCardinality(constraint.getMaxCardinality());
if (constraint.getNodeAttributeOpCode() != null) {
sb.setAttributeOpCode(
convertToProtoFormat(constraint.getNodeAttributeOpCode()));
}
if (constraint.getTargetExpressions() != null) {
for (TargetExpression target : constraint.getTargetExpressions()) {
sb.addTargetExpressions(
@ -171,4 +177,9 @@ public class PlacementConstraintToProtoConverter
return tb.build();
}
private static NodeAttributeOpCodeProto convertToProtoFormat(
NodeAttributeOpCode p) {
return NodeAttributeOpCodeProto.valueOf(p.name());
}
}

View File

@ -519,9 +519,10 @@ public class ResourceManager extends CompositeService
return new RMNodeLabelsManager();
}
protected NodeAttributesManager createNodeAttributesManager()
throws InstantiationException, IllegalAccessException {
return new NodeAttributesManagerImpl();
protected NodeAttributesManager createNodeAttributesManager() {
NodeAttributesManagerImpl namImpl = new NodeAttributesManagerImpl();
namImpl.setRMContext(rmContext);
return namImpl;
}
protected AllocationTagsManager createAllocationTagsManager() {

View File

@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.nodelabels.RMNodeAttribute;
import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
import com.google.common.base.Strings;
@ -92,6 +94,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
private final ReadLock readLock;
private final WriteLock writeLock;
private RMContext rmContext = null;
public NodeAttributesManagerImpl() {
super("NodeAttributesManagerImpl");
@ -131,7 +134,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
}
protected void initNodeAttributeStore(Configuration conf) throws Exception {
this.store =getAttributeStoreClass(conf);
this.store = getAttributeStoreClass(conf);
this.store.init(conf, this);
this.store.recover();
}
@ -206,6 +209,21 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
.handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op));
}
// Map used to notify RM
Map<String, Set<NodeAttribute>> newNodeToAttributesMap =
new HashMap<String, Set<NodeAttribute>>();
nodeAttributeMapping.forEach((k, v) -> {
Host node = nodeCollections.get(k);
newNodeToAttributesMap.put(k, node.attributes.keySet());
});
// Notify RM
if (rmContext != null && rmContext.getDispatcher() != null) {
LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap
.values());
rmContext.getDispatcher().getEventHandler().handle(
new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
}
} finally {
writeLock.unlock();
}
@ -703,4 +721,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
store.close();
}
}
public void setRMContext(RMContext context) {
this.rmContext = context;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
@ -79,6 +80,8 @@ public abstract class SchedulerNode {
private volatile Set<String> labels = null;
private volatile Set<NodeAttribute> nodeAttributes = null;
// Last updated time
private volatile long lastHeartbeatMonotonicTime;
@ -503,6 +506,14 @@ public abstract class SchedulerNode {
return getNodeID().hashCode();
}
public Set<NodeAttribute> getNodeAttributes() {
return nodeAttributes;
}
public void updateNodeAttributes(Set<NodeAttribute> attributes) {
this.nodeAttributes = attributes;
}
private static class ContainerInfo {
private final RMContainer container;
private boolean launchedOnNode;

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -137,6 +138,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -1767,6 +1769,14 @@ public class CapacityScheduler extends
updateNodeLabelsAndQueueResource(labelUpdateEvent);
}
break;
case NODE_ATTRIBUTES_UPDATE:
{
NodeAttributesUpdateSchedulerEvent attributeUpdateEvent =
(NodeAttributesUpdateSchedulerEvent) event;
updateNodeAttributes(attributeUpdateEvent);
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@ -1900,6 +1910,30 @@ public class CapacityScheduler extends
}
}
private void updateNodeAttributes(
NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
try {
writeLock.lock();
for (Entry<String, Set<NodeAttribute>> entry : attributeUpdateEvent
.getUpdatedNodeToAttributes().entrySet()) {
String hostname = entry.getKey();
Set<NodeAttribute> attributes = entry.getValue();
List<NodeId> nodeIds = nodeTracker.getNodeIdsByResourceName(hostname);
updateAttributesOnNode(nodeIds, attributes);
}
} finally {
writeLock.unlock();
}
}
private void updateAttributesOnNode(List<NodeId> nodeIds,
Set<NodeAttribute> attributes) {
nodeIds.forEach((k) -> {
SchedulerNode node = nodeTracker.getNode(k);
node.updateNodeAttributes(attributes);
});
}
/**
* Process node labels update.
*/
@ -2768,7 +2802,7 @@ public class CapacityScheduler extends
schedulingRequest, schedulerNode,
rmContext.getPlacementConstraintManager(),
rmContext.getAllocationTagsManager())) {
LOG.debug("Failed to allocate container for application "
LOG.info("Failed to allocate container for application "
+ appAttempt.getApplicationId() + " on node "
+ schedulerNode.getNodeName()
+ " because this allocation violates the"

View File

@ -24,8 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
@ -114,22 +113,92 @@ public final class PlacementConstraintsUtil {
|| maxScopeCardinality <= desiredMaxCardinality);
}
private static boolean canSatisfyNodePartitionConstraintExpresssion(
TargetExpression targetExpression, SchedulerNode schedulerNode) {
private static boolean canSatisfyNodeConstraintExpresssion(
SingleConstraint sc, TargetExpression targetExpression,
SchedulerNode schedulerNode) {
Set<String> values = targetExpression.getTargetValues();
if (values == null || values.isEmpty()) {
return schedulerNode.getPartition().equals(
RMNodeLabelsManager.NO_LABEL);
} else{
String nodePartition = values.iterator().next();
if (!nodePartition.equals(schedulerNode.getPartition())) {
if (targetExpression.getTargetKey().equals(NODE_PARTITION)) {
if (values == null || values.isEmpty()) {
return schedulerNode.getPartition()
.equals(RMNodeLabelsManager.NO_LABEL);
} else {
String nodePartition = values.iterator().next();
if (!nodePartition.equals(schedulerNode.getPartition())) {
return false;
}
}
} else {
NodeAttributeOpCode opCode = sc.getNodeAttributeOpCode();
// compare attributes.
String inputAttribute = values.iterator().next();
NodeAttribute requestAttribute = getNodeConstraintFromRequest(
targetExpression.getTargetKey(), inputAttribute);
if (requestAttribute == null) {
return true;
}
if (schedulerNode.getNodeAttributes() == null ||
!schedulerNode.getNodeAttributes().contains(requestAttribute)) {
if(LOG.isDebugEnabled()) {
LOG.debug("Incoming requestAttribute:" + requestAttribute
+ "is not present in " + schedulerNode.getNodeID());
}
return false;
}
boolean found = false;
for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
.iterator(); it.hasNext();) {
NodeAttribute nodeAttribute = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to compare Incoming requestAttribute :"
+ requestAttribute
+ " with requestAttribute value= " + requestAttribute
.getAttributeValue()
+ ", stored nodeAttribute value=" + nodeAttribute
.getAttributeValue());
}
if (requestAttribute.equals(nodeAttribute)) {
if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Incoming requestAttribute:" + requestAttribute
+ " matches with node:" + schedulerNode.getNodeID());
}
found = true;
return found;
}
}
}
if (!found) {
if(LOG.isDebugEnabled()) {
LOG.info("skip this node:" + schedulerNode.getNodeID()
+ " for requestAttribute:" + requestAttribute);
}
return false;
}
}
return true;
}
private static boolean isOpCodeMatches(NodeAttribute requestAttribute,
NodeAttribute nodeAttribute, NodeAttributeOpCode opCode) {
boolean retCode = false;
switch (opCode) {
case EQ:
retCode = requestAttribute.getAttributeValue()
.equals(nodeAttribute.getAttributeValue());
break;
case NE:
retCode = !(requestAttribute.getAttributeValue()
.equals(nodeAttribute.getAttributeValue()));
break;
default:
break;
}
return retCode;
}
private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
SingleConstraint singleConstraint, SchedulerNode schedulerNode,
AllocationTagsManager tagsManager)
@ -146,10 +215,12 @@ public final class PlacementConstraintsUtil {
singleConstraint, currentExp, schedulerNode, tagsManager)) {
return false;
}
} else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)
&& currentExp.getTargetKey().equals(NODE_PARTITION)) {
// This is a node partition expression, check it.
canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode);
} else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
// This is a node attribute expression, check it.
if (!canSatisfyNodeConstraintExpresssion(singleConstraint, currentExp,
schedulerNode)) {
return false;
}
}
}
// return true if all targetExpressions are satisfied
@ -203,6 +274,11 @@ public final class PlacementConstraintsUtil {
AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException {
if (constraint == null) {
if(LOG.isDebugEnabled()) {
LOG.debug(
"Constraint is found empty during constraint validation for app:"
+ appId);
}
return true;
}
@ -263,4 +339,24 @@ public final class PlacementConstraintsUtil {
pcm.getMultilevelConstraint(applicationId, sourceTags, pc),
schedulerNode, atm);
}
private static NodeAttribute getNodeConstraintFromRequest(String attrKey,
String attrString) {
NodeAttribute nodeAttribute = null;
if(LOG.isDebugEnabled()) {
LOG.debug("Incoming node attribute: " + attrKey + "=" + attrString);
}
// Input node attribute could be like 1.8
String[] name = attrKey.split("/");
if (name == null || name.length == 1) {
nodeAttribute = NodeAttribute
.newInstance(attrKey, NodeAttributeType.STRING, attrString);
} else {
nodeAttribute = NodeAttribute
.newInstance(name[0], name[1], NodeAttributeType.STRING, attrString);
}
return nodeAttribute;
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
/**
* Event handler class for Node Attributes which sends events to Scheduler.
*/
public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent {
private Map<String, Set<NodeAttribute>> nodeToAttributes;
public NodeAttributesUpdateSchedulerEvent(
Map<String, Set<NodeAttribute>> newNodeToAttributesMap) {
super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE);
this.nodeToAttributes = newNodeToAttributesMap;
}
public Map<String, Set<NodeAttribute>> getUpdatedNodeToAttributes() {
return nodeToAttributes;
}
}

View File

@ -26,6 +26,7 @@ public enum SchedulerEventType {
NODE_UPDATE,
NODE_RESOURCE_UPDATE,
NODE_LABELS_UPDATE,
NODE_ATTRIBUTES_UPDATE,
// Source: RMApp
APP_ADDED,

View File

@ -396,6 +396,10 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
SchedulingMode schedulingMode) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
if(LOG.isDebugEnabled()) {
LOG.debug("precheckNode is invoked for " + schedulerNode.getNodeID() + ","
+ schedulingMode);
}
String nodePartitionToLookAt;
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
nodePartitionToLookAt = schedulerNode.getPartition();