diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 54ca114a91c..597046deb55 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -88,6 +88,8 @@ Release 2.6.0 - UNRELEASED YARN-913. Add a way to register long-lived services in a YARN cluster. (stevel) + YARN-2493. Added user-APIs for using node-labels. (Wangda Tan via vinodkv) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 74da4b51205..f186650e0c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -18,18 +18,20 @@ package org.apache.hadoop.yarn.api.records; +import java.util.Set; + import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Records; -import java.util.Set; - /** *

ApplicationSubmissionContext represents all of the * information needed by the ResourceManager to launch @@ -72,7 +74,8 @@ public abstract class ApplicationSubmissionContext { Priority priority, ContainerLaunchContext amContainer, boolean isUnmanagedAM, boolean cancelTokensWhenComplete, int maxAppAttempts, Resource resource, String applicationType, - boolean keepContainers) { + boolean keepContainers, String appLabelExpression, + String amContainerLabelExpression) { ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class); context.setApplicationId(applicationId); @@ -83,11 +86,30 @@ public abstract class ApplicationSubmissionContext { context.setUnmanagedAM(isUnmanagedAM); context.setCancelTokensWhenComplete(cancelTokensWhenComplete); context.setMaxAppAttempts(maxAppAttempts); - context.setResource(resource); context.setApplicationType(applicationType); context.setKeepContainersAcrossApplicationAttempts(keepContainers); + context.setNodeLabelExpression(appLabelExpression); + + ResourceRequest amReq = Records.newRecord(ResourceRequest.class); + amReq.setResourceName(ResourceRequest.ANY); + amReq.setCapability(resource); + amReq.setNumContainers(1); + amReq.setRelaxLocality(true); + amReq.setNodeLabelExpression(amContainerLabelExpression); + context.setAMContainerResourceRequest(amReq); return context; } + + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + Priority priority, ContainerLaunchContext amContainer, + boolean isUnmanagedAM, boolean cancelTokensWhenComplete, + int maxAppAttempts, Resource resource, String applicationType, + boolean keepContainers) { + return newInstance(applicationId, applicationName, queue, priority, + amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, + resource, applicationType, keepContainers, null, null); + } @Public @Stable @@ -98,7 +120,7 @@ public abstract class ApplicationSubmissionContext { int maxAppAttempts, Resource resource, String applicationType) { return newInstance(applicationId, applicationName, queue, priority, amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, - resource, applicationType, false); + resource, applicationType, false, null, null); } @Public @@ -112,6 +134,29 @@ public abstract class ApplicationSubmissionContext { amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, resource, null); } + + @Public + @Stable + public static ApplicationSubmissionContext newInstance( + ApplicationId applicationId, String applicationName, String queue, + ContainerLaunchContext amContainer, boolean isUnmanagedAM, + boolean cancelTokensWhenComplete, int maxAppAttempts, + String applicationType, boolean keepContainers, + String appLabelExpression, ResourceRequest resourceRequest) { + ApplicationSubmissionContext context = + Records.newRecord(ApplicationSubmissionContext.class); + context.setApplicationId(applicationId); + context.setApplicationName(applicationName); + context.setQueue(queue); + context.setAMContainerSpec(amContainer); + context.setUnmanagedAM(isUnmanagedAM); + context.setCancelTokensWhenComplete(cancelTokensWhenComplete); + context.setMaxAppAttempts(maxAppAttempts); + context.setApplicationType(applicationType); + context.setKeepContainersAcrossApplicationAttempts(keepContainers); + context.setAMContainerResourceRequest(resourceRequest); + return context; + } @Public @Stable @@ -290,13 +335,13 @@ public abstract class ApplicationSubmissionContext { /** * Get the resource required by the ApplicationMaster for this - * application. + * application. Please note this will be DEPRECATED, use getResource + * in getAMContainerResourceRequest instead. * * @return the resource required by the ApplicationMaster for * this application. */ @Public - @Stable public abstract Resource getResource(); /** @@ -307,7 +352,6 @@ public abstract class ApplicationSubmissionContext { * for this application. */ @Public - @Stable public abstract void setResource(Resource resource); /** @@ -379,6 +423,54 @@ public abstract class ApplicationSubmissionContext { @Public @Stable public abstract void setApplicationTags(Set tags); + + /** + * Get node-label-expression for this app. If this is set, all containers of + * this application without setting node-label-expression in ResurceRequest + * will get allocated resources on only those nodes that satisfy this + * node-label-expression. + * + * If different node-label-expression of this app and ResourceRequest are set + * at the same time, the one set in ResourceRequest will be used when + * allocating container + * + * @return node-label-expression for this app + */ + @Public + @Evolving + public abstract String getNodeLabelExpression(); + + /** + * Set node-label-expression for this app + * @param nodeLabelExpression node-label-expression of this app + */ + @Public + @Evolving + public abstract void setNodeLabelExpression(String nodeLabelExpression); + + /** + * Get ResourceRequest of AM container, if this is not null, scheduler will + * use this to acquire resource for AM container. + * + * If this is null, scheduler will assemble a ResourceRequest by using + * getResource and getPriority of + * ApplicationSubmissionContext. + * + * Number of containers and Priority will be ignore. + * + * @return ResourceRequest of AM container + */ + @Public + @Evolving + public abstract ResourceRequest getAMContainerResourceRequest(); + + /** + * Set ResourceRequest of AM container + * @param request of AM container + */ + @Public + @Evolving + public abstract void setAMContainerResourceRequest(ResourceRequest request); /** * Get the attemptFailuresValidityInterval in milliseconds for the application diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 86b55d1ebde..7f86caed1ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.records; import java.io.Serializable; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; @@ -70,12 +71,22 @@ public abstract class ResourceRequest implements Comparable { @Stable public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality) { + return newInstance(priority, hostName, capability, numContainers, + relaxLocality, null); + } + + @Public + @Stable + public static ResourceRequest newInstance(Priority priority, String hostName, + Resource capability, int numContainers, boolean relaxLocality, + String labelExpression) { ResourceRequest request = Records.newRecord(ResourceRequest.class); request.setPriority(priority); request.setResourceName(hostName); request.setCapability(capability); request.setNumContainers(numContainers); request.setRelaxLocality(relaxLocality); + request.setNodeLabelExpression(labelExpression); return request; } @@ -239,6 +250,32 @@ public abstract class ResourceRequest implements Comparable { @Stable public abstract void setRelaxLocality(boolean relaxLocality); + /** + * Get node-label-expression for this Resource Request. If this is set, all + * containers allocated to satisfy this resource-request will be only on those + * nodes that satisfy this node-label-expression + * + * @return node-label-expression + */ + @Public + @Evolving + public abstract String getNodeLabelExpression(); + + /** + * Set node label expression of this resource request. Now only + * support AND(&&), in the future will provide support for OR(||), NOT(!). + * + * Examples: + * - GPU && LARGE_MEM, ask for node has label GPU and LARGE_MEM together + * - "" (empty) means ask for node doesn't have label on it, this is default + * behavior + * + * @param nodelabelExpression node-label-expression of this ResourceRequest + */ + @Public + @Evolving + public abstract void setNodeLabelExpression(String nodelabelExpression); + @Override public int hashCode() { final int prime = 2153; @@ -283,6 +320,20 @@ public abstract class ResourceRequest implements Comparable { return false; } else if (!priority.equals(other.getPriority())) return false; + if (getNodeLabelExpression() == null) { + if (other.getNodeLabelExpression() != null) { + return false; + } + } else { + // do normalize on label expression before compare + String label1 = getNodeLabelExpression().replaceAll("[\\t ]", ""); + String label2 = + other.getNodeLabelExpression() == null ? null : other + .getNodeLabelExpression().replaceAll("[\\t ]", ""); + if (!label1.equals(label2)) { + return false; + } + } return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d07ce139c09..3db684b9fba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -241,6 +241,7 @@ message ResourceRequestProto { optional ResourceProto capability = 3; optional int32 num_containers = 4; optional bool relax_locality = 5 [default = true]; + optional string node_label_expression = 6; } enum AMCommandProto { @@ -294,6 +295,8 @@ message ApplicationSubmissionContextProto { optional int64 attempt_failures_validity_interval = 13 [default = -1]; optional LogAggregationContextProto log_aggregation_context = 14; optional ReservationIdProto reservation_id = 15; + optional string node_label_expression = 16; + optional ResourceRequestProto am_container_resource_request = 17; } message LogAggregationContextProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 9462a4e5b94..303b4371601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -18,7 +18,8 @@ package org.apache.hadoop.yarn.api.records.impl.pb; -import com.google.common.base.CharMatcher; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; @@ -38,12 +40,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import com.google.common.base.CharMatcher; import com.google.protobuf.TextFormat; -import java.util.HashSet; -import java.util.Set; - @Private @Unstable public class ApplicationSubmissionContextPBImpl @@ -58,6 +59,7 @@ extends ApplicationSubmissionContext { private ContainerLaunchContext amContainer = null; private Resource resource = null; private Set applicationTags = null; + private ResourceRequest amResourceRequest = null; private LogAggregationContext logAggregationContext = null; private ReservationId reservationId = null; @@ -117,6 +119,10 @@ extends ApplicationSubmissionContext { builder.clearApplicationTags(); builder.addAllApplicationTags(this.applicationTags); } + if (this.amResourceRequest != null) { + builder.setAmContainerResourceRequest( + convertToProtoFormat(this.amResourceRequest)); + } if (this.logAggregationContext != null) { builder.setLogAggregationContext( convertToProtoFormat(this.logAggregationContext)); @@ -140,8 +146,7 @@ extends ApplicationSubmissionContext { } viaProto = false; } - - + @Override public Priority getPriority() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; @@ -154,7 +159,7 @@ extends ApplicationSubmissionContext { this.priority = convertFromProtoFormat(p.getPriority()); return this.priority; } - + @Override public void setPriority(Priority priority) { maybeInitBuilder(); @@ -414,6 +419,14 @@ extends ApplicationSubmissionContext { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } + + private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) { + return new ResourceRequestPBImpl(p); + } + + private ResourceRequestProto convertToProtoFormat(ResourceRequest t) { + return ((ResourceRequestPBImpl)t).getProto(); + } private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); @@ -428,7 +441,8 @@ extends ApplicationSubmissionContext { return new ContainerLaunchContextPBImpl(p); } - private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) { + private ContainerLaunchContextProto convertToProtoFormat( + ContainerLaunchContext t) { return ((ContainerLaunchContextPBImpl)t).getProto(); } @@ -440,6 +454,47 @@ extends ApplicationSubmissionContext { return ((ResourcePBImpl)t).getProto(); } + @Override + public String getNodeLabelExpression() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeLabelExpression()) { + return null; + } + return p.getNodeLabelExpression(); + } + + @Override + public void setNodeLabelExpression(String labelExpression) { + maybeInitBuilder(); + if (labelExpression == null) { + builder.clearNodeLabelExpression(); + return; + } + builder.setNodeLabelExpression(labelExpression); + } + + @Override + public ResourceRequest getAMContainerResourceRequest() { + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.amResourceRequest != null) { + return amResourceRequest; + } // Else via proto + if (!p.hasAmContainerResourceRequest()) { + return null; + } + amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest()); + return amResourceRequest; + } + + @Override + public void setAMContainerResourceRequest(ResourceRequest request) { + maybeInitBuilder(); + if (request == null) { + builder.clearAmContainerResourceRequest(); + } + this.amResourceRequest = request; + } + @Override public long getAttemptFailuresValidityInterval() { ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 22863ac551b..0c8491fc29a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -186,4 +186,23 @@ public class ResourceRequestPBImpl extends ResourceRequest { + ", Location: " + getResourceName() + ", Relax Locality: " + getRelaxLocality() + "}"; } + + @Override + public String getNodeLabelExpression() { + ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeLabelExpression()) { + return null; + } + return (p.getNodeLabelExpression()); + } + + @Override + public void setNodeLabelExpression(String nodeLabelExpression) { + maybeInitBuilder(); + if (nodeLabelExpression == null) { + builder.clearNodeLabelExpression(); + return; + } + builder.setNodeLabelExpression(nodeLabelExpression); + } } \ No newline at end of file