diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 92af0a84d9b..5186224e7c5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -145,6 +145,8 @@ Release 2.6.0 - UNRELEASED
YARN-913. Add a way to register long-lived services in a YARN cluster.
(stevel)
+ YARN-2493. Added user-APIs for using node-labels. (Wangda Tan via vinodkv)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 74da4b51205..f186650e0c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -18,18 +18,20 @@
package org.apache.hadoop.yarn.api.records;
+import java.util.Set;
+
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records;
-import java.util.Set;
-
/**
*
ApplicationSubmissionContext
represents all of the
* information needed by the ResourceManager
to launch
@@ -72,7 +74,8 @@ public abstract class ApplicationSubmissionContext {
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
- boolean keepContainers) {
+ boolean keepContainers, String appLabelExpression,
+ String amContainerLabelExpression) {
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId);
@@ -83,11 +86,30 @@ public abstract class ApplicationSubmissionContext {
context.setUnmanagedAM(isUnmanagedAM);
context.setCancelTokensWhenComplete(cancelTokensWhenComplete);
context.setMaxAppAttempts(maxAppAttempts);
- context.setResource(resource);
context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
+ context.setNodeLabelExpression(appLabelExpression);
+
+ ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
+ amReq.setResourceName(ResourceRequest.ANY);
+ amReq.setCapability(resource);
+ amReq.setNumContainers(1);
+ amReq.setRelaxLocality(true);
+ amReq.setNodeLabelExpression(amContainerLabelExpression);
+ context.setAMContainerResourceRequest(amReq);
return context;
}
+
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ Priority priority, ContainerLaunchContext amContainer,
+ boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
+ int maxAppAttempts, Resource resource, String applicationType,
+ boolean keepContainers) {
+ return newInstance(applicationId, applicationName, queue, priority,
+ amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
+ resource, applicationType, keepContainers, null, null);
+ }
@Public
@Stable
@@ -98,7 +120,7 @@ public abstract class ApplicationSubmissionContext {
int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
- resource, applicationType, false);
+ resource, applicationType, false, null, null);
}
@Public
@@ -112,6 +134,29 @@ public abstract class ApplicationSubmissionContext {
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, null);
}
+
+ @Public
+ @Stable
+ public static ApplicationSubmissionContext newInstance(
+ ApplicationId applicationId, String applicationName, String queue,
+ ContainerLaunchContext amContainer, boolean isUnmanagedAM,
+ boolean cancelTokensWhenComplete, int maxAppAttempts,
+ String applicationType, boolean keepContainers,
+ String appLabelExpression, ResourceRequest resourceRequest) {
+ ApplicationSubmissionContext context =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ context.setApplicationId(applicationId);
+ context.setApplicationName(applicationName);
+ context.setQueue(queue);
+ context.setAMContainerSpec(amContainer);
+ context.setUnmanagedAM(isUnmanagedAM);
+ context.setCancelTokensWhenComplete(cancelTokensWhenComplete);
+ context.setMaxAppAttempts(maxAppAttempts);
+ context.setApplicationType(applicationType);
+ context.setKeepContainersAcrossApplicationAttempts(keepContainers);
+ context.setAMContainerResourceRequest(resourceRequest);
+ return context;
+ }
@Public
@Stable
@@ -290,13 +335,13 @@ public abstract class ApplicationSubmissionContext {
/**
* Get the resource required by the ApplicationMaster
for this
- * application.
+ * application. Please note this will be DEPRECATED, use getResource
+ * in getAMContainerResourceRequest instead.
*
* @return the resource required by the ApplicationMaster
for
* this application.
*/
@Public
- @Stable
public abstract Resource getResource();
/**
@@ -307,7 +352,6 @@ public abstract class ApplicationSubmissionContext {
* for this application.
*/
@Public
- @Stable
public abstract void setResource(Resource resource);
/**
@@ -379,6 +423,54 @@ public abstract class ApplicationSubmissionContext {
@Public
@Stable
public abstract void setApplicationTags(Set tags);
+
+ /**
+ * Get node-label-expression for this app. If this is set, all containers of
+ * this application without setting node-label-expression in ResurceRequest
+ * will get allocated resources on only those nodes that satisfy this
+ * node-label-expression.
+ *
+ * If different node-label-expression of this app and ResourceRequest are set
+ * at the same time, the one set in ResourceRequest will be used when
+ * allocating container
+ *
+ * @return node-label-expression for this app
+ */
+ @Public
+ @Evolving
+ public abstract String getNodeLabelExpression();
+
+ /**
+ * Set node-label-expression for this app
+ * @param nodeLabelExpression node-label-expression of this app
+ */
+ @Public
+ @Evolving
+ public abstract void setNodeLabelExpression(String nodeLabelExpression);
+
+ /**
+ * Get ResourceRequest of AM container, if this is not null, scheduler will
+ * use this to acquire resource for AM container.
+ *
+ * If this is null, scheduler will assemble a ResourceRequest by using
+ * getResource and getPriority of
+ * ApplicationSubmissionContext.
+ *
+ * Number of containers and Priority will be ignore.
+ *
+ * @return ResourceRequest of AM container
+ */
+ @Public
+ @Evolving
+ public abstract ResourceRequest getAMContainerResourceRequest();
+
+ /**
+ * Set ResourceRequest of AM container
+ * @param request of AM container
+ */
+ @Public
+ @Evolving
+ public abstract void setAMContainerResourceRequest(ResourceRequest request);
/**
* Get the attemptFailuresValidityInterval in milliseconds for the application
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 86b55d1ebde..7f86caed1ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.records;
import java.io.Serializable;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.util.Records;
@@ -70,12 +71,22 @@ public abstract class ResourceRequest implements Comparable {
@Stable
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality) {
+ return newInstance(priority, hostName, capability, numContainers,
+ relaxLocality, null);
+ }
+
+ @Public
+ @Stable
+ public static ResourceRequest newInstance(Priority priority, String hostName,
+ Resource capability, int numContainers, boolean relaxLocality,
+ String labelExpression) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority);
request.setResourceName(hostName);
request.setCapability(capability);
request.setNumContainers(numContainers);
request.setRelaxLocality(relaxLocality);
+ request.setNodeLabelExpression(labelExpression);
return request;
}
@@ -239,6 +250,32 @@ public abstract class ResourceRequest implements Comparable {
@Stable
public abstract void setRelaxLocality(boolean relaxLocality);
+ /**
+ * Get node-label-expression for this Resource Request. If this is set, all
+ * containers allocated to satisfy this resource-request will be only on those
+ * nodes that satisfy this node-label-expression
+ *
+ * @return node-label-expression
+ */
+ @Public
+ @Evolving
+ public abstract String getNodeLabelExpression();
+
+ /**
+ * Set node label expression of this resource request. Now only
+ * support AND(&&), in the future will provide support for OR(||), NOT(!).
+ *
+ * Examples:
+ * - GPU && LARGE_MEM, ask for node has label GPU and LARGE_MEM together
+ * - "" (empty) means ask for node doesn't have label on it, this is default
+ * behavior
+ *
+ * @param nodelabelExpression node-label-expression of this ResourceRequest
+ */
+ @Public
+ @Evolving
+ public abstract void setNodeLabelExpression(String nodelabelExpression);
+
@Override
public int hashCode() {
final int prime = 2153;
@@ -283,6 +320,20 @@ public abstract class ResourceRequest implements Comparable {
return false;
} else if (!priority.equals(other.getPriority()))
return false;
+ if (getNodeLabelExpression() == null) {
+ if (other.getNodeLabelExpression() != null) {
+ return false;
+ }
+ } else {
+ // do normalize on label expression before compare
+ String label1 = getNodeLabelExpression().replaceAll("[\\t ]", "");
+ String label2 =
+ other.getNodeLabelExpression() == null ? null : other
+ .getNodeLabelExpression().replaceAll("[\\t ]", "");
+ if (!label1.equals(label2)) {
+ return false;
+ }
+ }
return true;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index d07ce139c09..3db684b9fba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -241,6 +241,7 @@ message ResourceRequestProto {
optional ResourceProto capability = 3;
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
+ optional string node_label_expression = 6;
}
enum AMCommandProto {
@@ -294,6 +295,8 @@ message ApplicationSubmissionContextProto {
optional int64 attempt_failures_validity_interval = 13 [default = -1];
optional LogAggregationContextProto log_aggregation_context = 14;
optional ReservationIdProto reservation_id = 15;
+ optional string node_label_expression = 16;
+ optional ResourceRequestProto am_container_resource_request = 17;
}
message LogAggregationContextProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index 9462a4e5b94..303b4371601 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
-import com.google.common.base.CharMatcher;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
@@ -38,12 +40,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
+import com.google.common.base.CharMatcher;
import com.google.protobuf.TextFormat;
-import java.util.HashSet;
-import java.util.Set;
-
@Private
@Unstable
public class ApplicationSubmissionContextPBImpl
@@ -58,6 +59,7 @@ extends ApplicationSubmissionContext {
private ContainerLaunchContext amContainer = null;
private Resource resource = null;
private Set applicationTags = null;
+ private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null;
@@ -117,6 +119,10 @@ extends ApplicationSubmissionContext {
builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags);
}
+ if (this.amResourceRequest != null) {
+ builder.setAmContainerResourceRequest(
+ convertToProtoFormat(this.amResourceRequest));
+ }
if (this.logAggregationContext != null) {
builder.setLogAggregationContext(
convertToProtoFormat(this.logAggregationContext));
@@ -140,8 +146,7 @@ extends ApplicationSubmissionContext {
}
viaProto = false;
}
-
-
+
@Override
public Priority getPriority() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -154,7 +159,7 @@ extends ApplicationSubmissionContext {
this.priority = convertFromProtoFormat(p.getPriority());
return this.priority;
}
-
+
@Override
public void setPriority(Priority priority) {
maybeInitBuilder();
@@ -414,6 +419,14 @@ extends ApplicationSubmissionContext {
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto();
}
+
+ private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
+ return new ResourceRequestPBImpl(p);
+ }
+
+ private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
+ return ((ResourceRequestPBImpl)t).getProto();
+ }
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
@@ -428,7 +441,8 @@ extends ApplicationSubmissionContext {
return new ContainerLaunchContextPBImpl(p);
}
- private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) {
+ private ContainerLaunchContextProto convertToProtoFormat(
+ ContainerLaunchContext t) {
return ((ContainerLaunchContextPBImpl)t).getProto();
}
@@ -440,6 +454,47 @@ extends ApplicationSubmissionContext {
return ((ResourcePBImpl)t).getProto();
}
+ @Override
+ public String getNodeLabelExpression() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeLabelExpression()) {
+ return null;
+ }
+ return p.getNodeLabelExpression();
+ }
+
+ @Override
+ public void setNodeLabelExpression(String labelExpression) {
+ maybeInitBuilder();
+ if (labelExpression == null) {
+ builder.clearNodeLabelExpression();
+ return;
+ }
+ builder.setNodeLabelExpression(labelExpression);
+ }
+
+ @Override
+ public ResourceRequest getAMContainerResourceRequest() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.amResourceRequest != null) {
+ return amResourceRequest;
+ } // Else via proto
+ if (!p.hasAmContainerResourceRequest()) {
+ return null;
+ }
+ amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest());
+ return amResourceRequest;
+ }
+
+ @Override
+ public void setAMContainerResourceRequest(ResourceRequest request) {
+ maybeInitBuilder();
+ if (request == null) {
+ builder.clearAmContainerResourceRequest();
+ }
+ this.amResourceRequest = request;
+ }
+
@Override
public long getAttemptFailuresValidityInterval() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index 22863ac551b..0c8491fc29a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -186,4 +186,23 @@ public class ResourceRequestPBImpl extends ResourceRequest {
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality() + "}";
}
+
+ @Override
+ public String getNodeLabelExpression() {
+ ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeLabelExpression()) {
+ return null;
+ }
+ return (p.getNodeLabelExpression());
+ }
+
+ @Override
+ public void setNodeLabelExpression(String nodeLabelExpression) {
+ maybeInitBuilder();
+ if (nodeLabelExpression == null) {
+ builder.clearNodeLabelExpression();
+ return;
+ }
+ builder.setNodeLabelExpression(nodeLabelExpression);
+ }
}
\ No newline at end of file