From db928556c81e5950b3fe374fa5b99ab26791ef3a Mon Sep 17 00:00:00 2001 From: Konstantinos Karanasos Date: Mon, 13 Nov 2017 15:25:24 -0800 Subject: [PATCH] YARN-6595. [API] Add Placement Constraints at the application level. (Arun Suresh via kkaranasos) --- .../RegisterApplicationMasterRequest.java | 42 ++++- .../api/resource/PlacementConstraint.java | 156 ++++++++++++++++++ .../src/main/proto/yarn_protos.proto | 6 + .../src/main/proto/yarn_service_protos.proto | 1 + ...egisterApplicationMasterRequestPBImpl.java | 106 +++++++++++- .../yarn/api/BasePBImplRecordsTest.java | 11 ++ 6 files changed, 313 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java index 395e190ff59..f2d537ae072 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java @@ -18,11 +18,16 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.util.Records; - /** * The request sent by the {@code ApplicationMaster} to {@code ResourceManager} * on registration. @@ -132,4 +137,39 @@ public abstract class RegisterApplicationMasterRequest { @Public @Stable public abstract void setTrackingUrl(String trackingUrl); + + /** + * Return all Placement Constraints specified at the Application level. The + * mapping is from a set of allocation tags to a + * PlacementConstraint associated with the tags, i.e., each + * {@link org.apache.hadoop.yarn.api.records.SchedulingRequest} that has those + * tags will be placed taking into account the corresponding constraint. + * + * @return A map of Placement Constraints. + */ + @Public + @Unstable + public Map, PlacementConstraint> getPlacementConstraints() { + return new HashMap<>(); + } + + /** + * Set Placement Constraints applicable to the + * {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}s + * of this application. + * The mapping is from a set of allocation tags to a + * PlacementConstraint associated with the tags. + * For example: + * Map < + * <hb_regionserver> -> node_anti_affinity, + * <hb_regionserver, hb_master> -> rack_affinity, + * ... + * > + * @param placementConstraints Placement Constraint Mapping. + */ + @Public + @Unstable + public void setPlacementConstraints( + Map, PlacementConstraint> placementConstraints) { + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java index f0e398209d3..b6e851ac161 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java @@ -54,6 +54,26 @@ public class PlacementConstraint { return constraintExpr; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PlacementConstraint)) { + return false; + } + + PlacementConstraint that = (PlacementConstraint) o; + + return getConstraintExpr() != null ? getConstraintExpr().equals(that + .getConstraintExpr()) : that.getConstraintExpr() == null; + } + + @Override + public int hashCode() { + return getConstraintExpr() != null ? getConstraintExpr().hashCode() : 0; + } + /** * Interface used to enable the elements of the constraint tree to be visited. */ @@ -173,6 +193,38 @@ public class PlacementConstraint { return targetExpressions; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SingleConstraint)) { + return false; + } + + SingleConstraint that = (SingleConstraint) o; + + if (getMinCardinality() != that.getMinCardinality()) { + return false; + } + if (getMaxCardinality() != that.getMaxCardinality()) { + return false; + } + if (!getScope().equals(that.getScope())) { + return false; + } + return getTargetExpressions().equals(that.getTargetExpressions()); + } + + @Override + public int hashCode() { + int result = getScope().hashCode(); + result = 31 * result + getMinCardinality(); + result = 31 * result + getMaxCardinality(); + result = 31 * result + getTargetExpressions().hashCode(); + return result; + } + @Override public T accept(Visitor visitor) { return visitor.visit(this); @@ -331,6 +383,34 @@ public class PlacementConstraint { return targetExpressions; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TargetConstraint)) { + return false; + } + + TargetConstraint that = (TargetConstraint) o; + + if (getOp() != that.getOp()) { + return false; + } + if (!getScope().equals(that.getScope())) { + return false; + } + return getTargetExpressions().equals(that.getTargetExpressions()); + } + + @Override + public int hashCode() { + int result = getOp().hashCode(); + result = 31 * result + getScope().hashCode(); + result = 31 * result + getTargetExpressions().hashCode(); + return result; + } + @Override public T accept(Visitor visitor) { return visitor.visit(this); @@ -388,6 +468,34 @@ public class PlacementConstraint { public T accept(Visitor visitor) { return visitor.visit(this); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CardinalityConstraint that = (CardinalityConstraint) o; + + if (minCardinality != that.minCardinality) { + return false; + } + if (maxCardinality != that.maxCardinality) { + return false; + } + return scope != null ? scope.equals(that.scope) : that.scope == null; + } + + @Override + public int hashCode() { + int result = scope != null ? scope.hashCode() : 0; + result = 31 * result + minCardinality; + result = 31 * result + maxCardinality; + return result; + } } /** @@ -406,6 +514,25 @@ public class PlacementConstraint { * @return the children of the composite constraint */ public abstract List getChildren(); + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return getChildren() != null ? getChildren().equals( + ((CompositeConstraint)o).getChildren()) : + ((CompositeConstraint)o).getChildren() == null; + } + + @Override + public int hashCode() { + return getChildren() != null ? getChildren().hashCode() : 0; + } } /** @@ -563,5 +690,34 @@ public class PlacementConstraint { public T accept(Visitor visitor) { return visitor.visit(this); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TimedPlacementConstraint that = (TimedPlacementConstraint) o; + + if (schedulingDelay != that.schedulingDelay) { + return false; + } + if (constraint != null ? !constraint.equals(that.constraint) : + that.constraint != null) { + return false; + } + return delayUnit == that.delayUnit; + } + + @Override + public int hashCode() { + int result = constraint != null ? constraint.hashCode() : 0; + result = 31 * result + (int) (schedulingDelay ^ (schedulingDelay >>> 32)); + result = 31 * result + (delayUnit != null ? delayUnit.hashCode() : 0); + return result; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d24f8639ef9..fdc39a777fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -649,6 +649,12 @@ message CompositePlacementConstraintProto { repeated TimedPlacementConstraintProto timedChildConstraints = 3; } +// This associates a set of allocation tags to a Placement Constraint. +message PlacementConstraintMapEntryProto { + repeated string allocation_tags = 1; + optional PlacementConstraintProto placement_constraint = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From reservation_protocol ///////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 4e97c7442d8..68e585d15bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -38,6 +38,7 @@ message RegisterApplicationMasterRequestProto { optional string host = 1; optional int32 rpc_port = 2; optional string tracking_url = 3; + repeated PlacementConstraintMapEntryProto placement_constraints = 4; } message RegisterApplicationMasterResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java index 037dfd98760..64bee85bb3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java @@ -21,24 +21,41 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.pb.PlacementConstraintFromProtoConverter; +import org.apache.hadoop.yarn.api.pb.PlacementConstraintToProtoConverter; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; + +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProtoOrBuilder; import com.google.protobuf.TextFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + @Private @Unstable -public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationMasterRequest { - RegisterApplicationMasterRequestProto proto = RegisterApplicationMasterRequestProto.getDefaultInstance(); - RegisterApplicationMasterRequestProto.Builder builder = null; +public class RegisterApplicationMasterRequestPBImpl + extends RegisterApplicationMasterRequest { + private RegisterApplicationMasterRequestProto proto = + RegisterApplicationMasterRequestProto.getDefaultInstance(); + private RegisterApplicationMasterRequestProto.Builder builder = null; + private Map, PlacementConstraint> placementConstraints = null; boolean viaProto = false; public RegisterApplicationMasterRequestPBImpl() { builder = RegisterApplicationMasterRequestProto.newBuilder(); } - public RegisterApplicationMasterRequestPBImpl(RegisterApplicationMasterRequestProto proto) { + public RegisterApplicationMasterRequestPBImpl( + RegisterApplicationMasterRequestProto proto) { this.proto = proto; viaProto = true; } @@ -71,6 +88,30 @@ public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationM } private void mergeLocalToBuilder() { + if (this.placementConstraints != null) { + addPlacementConstraintMap(); + } + } + + private void addPlacementConstraintMap() { + maybeInitBuilder(); + builder.clearPlacementConstraints(); + if (this.placementConstraints == null) { + return; + } + List protoList = + new ArrayList<>(); + for (Map.Entry, PlacementConstraint> entry : + this.placementConstraints.entrySet()) { + protoList.add( + YarnProtos.PlacementConstraintMapEntryProto.newBuilder() + .addAllAllocationTags(entry.getKey()) + .setPlacementConstraint( + new PlacementConstraintToProtoConverter( + entry.getValue()).convert()) + .build()); + } + builder.addAllPlacementConstraints(protoList); } private void mergeLocalToProto() { @@ -90,7 +131,8 @@ public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationM @Override public String getHost() { - RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder; + RegisterApplicationMasterRequestProtoOrBuilder p = + viaProto ? proto : builder; return p.getHost(); } @@ -106,7 +148,8 @@ public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationM @Override public int getRpcPort() { - RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder; + RegisterApplicationMasterRequestProtoOrBuilder p = + viaProto ? proto : builder; return p.getRpcPort(); } @@ -118,7 +161,8 @@ public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationM @Override public String getTrackingUrl() { - RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder; + RegisterApplicationMasterRequestProtoOrBuilder p = + viaProto ? proto : builder; return p.getTrackingUrl(); } @@ -131,4 +175,50 @@ public class RegisterApplicationMasterRequestPBImpl extends RegisterApplicationM } builder.setTrackingUrl(url); } -} + + private void initPlacementConstraintMap() { + if (this.placementConstraints != null) { + return; + } + RegisterApplicationMasterRequestProtoOrBuilder p = + viaProto ? proto : builder; + List pcmList = + p.getPlacementConstraintsList(); + this.placementConstraints = new HashMap<>(); + for (YarnProtos.PlacementConstraintMapEntryProto e : pcmList) { + this.placementConstraints.put( + new HashSet<>(e.getAllocationTagsList()), + new PlacementConstraintFromProtoConverter( + e.getPlacementConstraint()).convert()); + } + } + + @Override + public Map, PlacementConstraint> getPlacementConstraints() { + initPlacementConstraintMap(); + return this.placementConstraints; + } + + @Override + public void setPlacementConstraints( + Map, PlacementConstraint> constraints) { + maybeInitBuilder(); + if (constraints == null) { + builder.clearPlacementConstraints(); + } else { + removeEmptyKeys(constraints); + } + this.placementConstraints = constraints; + } + + private void removeEmptyKeys( + Map, PlacementConstraint> constraintMap) { + Iterator> iter = constraintMap.keySet().iterator(); + while (iter.hasNext()) { + Set aTags = iter.next(); + if (aTags.size() == 0) { + iter.remove(); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java index 86946518db3..ebd66af2618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java @@ -22,12 +22,19 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.junit.Assert; import java.lang.reflect.*; import java.nio.ByteBuffer; import java.util.*; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints + .PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; + /** * Generic helper class to validate protocol records. */ @@ -85,6 +92,10 @@ public class BasePBImplRecordsTest { ByteBuffer buff = ByteBuffer.allocate(4); rand.nextBytes(buff.array()); return buff; + } else if (type.equals(PlacementConstraint.class)) { + PlacementConstraint.AbstractConstraint sConstraintExpr = + targetIn(NODE, allocationTag("foo")); + ret = PlacementConstraints.build(sConstraintExpr); } } else if (type instanceof ParameterizedType) { ParameterizedType pt = (ParameterizedType)type;