YARN-2493. Added user-APIs for using node-labels. Contributed by Wangda Tan.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-10-09 11:26:26 -07:00
parent 1123a06e36
commit 180afa2f86
6 changed files with 238 additions and 16 deletions

View File

@ -145,6 +145,8 @@ Release 2.6.0 - UNRELEASED
YARN-913. Add a way to register long-lived services in a YARN cluster. YARN-913. Add a way to register long-lived services in a YARN cluster.
(stevel) (stevel)
YARN-2493. Added user-APIs for using node-labels. (Wangda Tan via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -18,18 +18,20 @@
package org.apache.hadoop.yarn.api.records; package org.apache.hadoop.yarn.api.records;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import java.util.Set;
/** /**
* <p><code>ApplicationSubmissionContext</code> represents all of the * <p><code>ApplicationSubmissionContext</code> represents all of the
* information needed by the <code>ResourceManager</code> to launch * information needed by the <code>ResourceManager</code> to launch
@ -72,7 +74,8 @@ public abstract class ApplicationSubmissionContext {
Priority priority, ContainerLaunchContext amContainer, Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete, boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType, int maxAppAttempts, Resource resource, String applicationType,
boolean keepContainers) { boolean keepContainers, String appLabelExpression,
String amContainerLabelExpression) {
ApplicationSubmissionContext context = ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class); Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId); context.setApplicationId(applicationId);
@ -83,12 +86,31 @@ public abstract class ApplicationSubmissionContext {
context.setUnmanagedAM(isUnmanagedAM); context.setUnmanagedAM(isUnmanagedAM);
context.setCancelTokensWhenComplete(cancelTokensWhenComplete); context.setCancelTokensWhenComplete(cancelTokensWhenComplete);
context.setMaxAppAttempts(maxAppAttempts); context.setMaxAppAttempts(maxAppAttempts);
context.setResource(resource);
context.setApplicationType(applicationType); context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers); context.setKeepContainersAcrossApplicationAttempts(keepContainers);
context.setNodeLabelExpression(appLabelExpression);
ResourceRequest amReq = Records.newRecord(ResourceRequest.class);
amReq.setResourceName(ResourceRequest.ANY);
amReq.setCapability(resource);
amReq.setNumContainers(1);
amReq.setRelaxLocality(true);
amReq.setNodeLabelExpression(amContainerLabelExpression);
context.setAMContainerResourceRequest(amReq);
return context; return context;
} }
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
Priority priority, ContainerLaunchContext amContainer,
boolean isUnmanagedAM, boolean cancelTokensWhenComplete,
int maxAppAttempts, Resource resource, String applicationType,
boolean keepContainers) {
return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, applicationType, keepContainers, null, null);
}
@Public @Public
@Stable @Stable
public static ApplicationSubmissionContext newInstance( public static ApplicationSubmissionContext newInstance(
@ -98,7 +120,7 @@ public abstract class ApplicationSubmissionContext {
int maxAppAttempts, Resource resource, String applicationType) { int maxAppAttempts, Resource resource, String applicationType) {
return newInstance(applicationId, applicationName, queue, priority, return newInstance(applicationId, applicationName, queue, priority,
amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts, amContainer, isUnmanagedAM, cancelTokensWhenComplete, maxAppAttempts,
resource, applicationType, false); resource, applicationType, false, null, null);
} }
@Public @Public
@ -113,6 +135,29 @@ public abstract class ApplicationSubmissionContext {
resource, null); resource, null);
} }
@Public
@Stable
public static ApplicationSubmissionContext newInstance(
ApplicationId applicationId, String applicationName, String queue,
ContainerLaunchContext amContainer, boolean isUnmanagedAM,
boolean cancelTokensWhenComplete, int maxAppAttempts,
String applicationType, boolean keepContainers,
String appLabelExpression, ResourceRequest resourceRequest) {
ApplicationSubmissionContext context =
Records.newRecord(ApplicationSubmissionContext.class);
context.setApplicationId(applicationId);
context.setApplicationName(applicationName);
context.setQueue(queue);
context.setAMContainerSpec(amContainer);
context.setUnmanagedAM(isUnmanagedAM);
context.setCancelTokensWhenComplete(cancelTokensWhenComplete);
context.setMaxAppAttempts(maxAppAttempts);
context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers);
context.setAMContainerResourceRequest(resourceRequest);
return context;
}
@Public @Public
@Stable @Stable
public static ApplicationSubmissionContext newInstance( public static ApplicationSubmissionContext newInstance(
@ -290,13 +335,13 @@ public abstract class ApplicationSubmissionContext {
/** /**
* Get the resource required by the <code>ApplicationMaster</code> for this * Get the resource required by the <code>ApplicationMaster</code> for this
* application. * application. Please note this will be DEPRECATED, use <em>getResource</em>
* in <em>getAMContainerResourceRequest</em> instead.
* *
* @return the resource required by the <code>ApplicationMaster</code> for * @return the resource required by the <code>ApplicationMaster</code> for
* this application. * this application.
*/ */
@Public @Public
@Stable
public abstract Resource getResource(); public abstract Resource getResource();
/** /**
@ -307,7 +352,6 @@ public abstract class ApplicationSubmissionContext {
* for this application. * for this application.
*/ */
@Public @Public
@Stable
public abstract void setResource(Resource resource); public abstract void setResource(Resource resource);
/** /**
@ -380,6 +424,54 @@ public abstract class ApplicationSubmissionContext {
@Stable @Stable
public abstract void setApplicationTags(Set<String> tags); public abstract void setApplicationTags(Set<String> tags);
/**
* Get node-label-expression for this app. If this is set, all containers of
* this application without setting node-label-expression in ResurceRequest
* will get allocated resources on only those nodes that satisfy this
* node-label-expression.
*
* If different node-label-expression of this app and ResourceRequest are set
* at the same time, the one set in ResourceRequest will be used when
* allocating container
*
* @return node-label-expression for this app
*/
@Public
@Evolving
public abstract String getNodeLabelExpression();
/**
* Set node-label-expression for this app
* @param nodeLabelExpression node-label-expression of this app
*/
@Public
@Evolving
public abstract void setNodeLabelExpression(String nodeLabelExpression);
/**
* Get ResourceRequest of AM container, if this is not null, scheduler will
* use this to acquire resource for AM container.
*
* If this is null, scheduler will assemble a ResourceRequest by using
* <em>getResource</em> and <em>getPriority</em> of
* <em>ApplicationSubmissionContext</em>.
*
* Number of containers and Priority will be ignore.
*
* @return ResourceRequest of AM container
*/
@Public
@Evolving
public abstract ResourceRequest getAMContainerResourceRequest();
/**
* Set ResourceRequest of AM container
* @param request of AM container
*/
@Public
@Evolving
public abstract void setAMContainerResourceRequest(ResourceRequest request);
/** /**
* Get the attemptFailuresValidityInterval in milliseconds for the application * Get the attemptFailuresValidityInterval in milliseconds for the application
* *

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.records;
import java.io.Serializable; import java.io.Serializable;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -70,12 +71,22 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
@Stable @Stable
public static ResourceRequest newInstance(Priority priority, String hostName, public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality) { Resource capability, int numContainers, boolean relaxLocality) {
return newInstance(priority, hostName, capability, numContainers,
relaxLocality, null);
}
@Public
@Stable
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality,
String labelExpression) {
ResourceRequest request = Records.newRecord(ResourceRequest.class); ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority); request.setPriority(priority);
request.setResourceName(hostName); request.setResourceName(hostName);
request.setCapability(capability); request.setCapability(capability);
request.setNumContainers(numContainers); request.setNumContainers(numContainers);
request.setRelaxLocality(relaxLocality); request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(labelExpression);
return request; return request;
} }
@ -239,6 +250,32 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
@Stable @Stable
public abstract void setRelaxLocality(boolean relaxLocality); public abstract void setRelaxLocality(boolean relaxLocality);
/**
* Get node-label-expression for this Resource Request. If this is set, all
* containers allocated to satisfy this resource-request will be only on those
* nodes that satisfy this node-label-expression
*
* @return node-label-expression
*/
@Public
@Evolving
public abstract String getNodeLabelExpression();
/**
* Set node label expression of this resource request. Now only
* support AND(&&), in the future will provide support for OR(||), NOT(!).
*
* Examples:
* - GPU && LARGE_MEM, ask for node has label GPU and LARGE_MEM together
* - "" (empty) means ask for node doesn't have label on it, this is default
* behavior
*
* @param nodelabelExpression node-label-expression of this ResourceRequest
*/
@Public
@Evolving
public abstract void setNodeLabelExpression(String nodelabelExpression);
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 2153; final int prime = 2153;
@ -283,6 +320,20 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
return false; return false;
} else if (!priority.equals(other.getPriority())) } else if (!priority.equals(other.getPriority()))
return false; return false;
if (getNodeLabelExpression() == null) {
if (other.getNodeLabelExpression() != null) {
return false;
}
} else {
// do normalize on label expression before compare
String label1 = getNodeLabelExpression().replaceAll("[\\t ]", "");
String label2 =
other.getNodeLabelExpression() == null ? null : other
.getNodeLabelExpression().replaceAll("[\\t ]", "");
if (!label1.equals(label2)) {
return false;
}
}
return true; return true;
} }

View File

@ -241,6 +241,7 @@ message ResourceRequestProto {
optional ResourceProto capability = 3; optional ResourceProto capability = 3;
optional int32 num_containers = 4; optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true]; optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
} }
enum AMCommandProto { enum AMCommandProto {
@ -294,6 +295,8 @@ message ApplicationSubmissionContextProto {
optional int64 attempt_failures_validity_interval = 13 [default = -1]; optional int64 attempt_failures_validity_interval = 13 [default = -1];
optional LogAggregationContextProto log_aggregation_context = 14; optional LogAggregationContextProto log_aggregation_context = 14;
optional ReservationIdProto reservation_id = 15; optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17;
} }
message LogAggregationContextProto { message LogAggregationContextProto {

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.yarn.api.records.impl.pb; package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.common.base.CharMatcher; import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
@ -38,12 +40,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import com.google.common.base.CharMatcher;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
import java.util.HashSet;
import java.util.Set;
@Private @Private
@Unstable @Unstable
public class ApplicationSubmissionContextPBImpl public class ApplicationSubmissionContextPBImpl
@ -58,6 +59,7 @@ extends ApplicationSubmissionContext {
private ContainerLaunchContext amContainer = null; private ContainerLaunchContext amContainer = null;
private Resource resource = null; private Resource resource = null;
private Set<String> applicationTags = null; private Set<String> applicationTags = null;
private ResourceRequest amResourceRequest = null;
private LogAggregationContext logAggregationContext = null; private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null; private ReservationId reservationId = null;
@ -117,6 +119,10 @@ extends ApplicationSubmissionContext {
builder.clearApplicationTags(); builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags); builder.addAllApplicationTags(this.applicationTags);
} }
if (this.amResourceRequest != null) {
builder.setAmContainerResourceRequest(
convertToProtoFormat(this.amResourceRequest));
}
if (this.logAggregationContext != null) { if (this.logAggregationContext != null) {
builder.setLogAggregationContext( builder.setLogAggregationContext(
convertToProtoFormat(this.logAggregationContext)); convertToProtoFormat(this.logAggregationContext));
@ -141,7 +147,6 @@ extends ApplicationSubmissionContext {
viaProto = false; viaProto = false;
} }
@Override @Override
public Priority getPriority() { public Priority getPriority() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@ -415,6 +420,14 @@ extends ApplicationSubmissionContext {
return ((PriorityPBImpl)t).getProto(); return ((PriorityPBImpl)t).getProto();
} }
private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
return new ResourceRequestPBImpl(p);
}
private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
return ((ResourceRequestPBImpl)t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p); return new ApplicationIdPBImpl(p);
} }
@ -428,7 +441,8 @@ extends ApplicationSubmissionContext {
return new ContainerLaunchContextPBImpl(p); return new ContainerLaunchContextPBImpl(p);
} }
private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) { private ContainerLaunchContextProto convertToProtoFormat(
ContainerLaunchContext t) {
return ((ContainerLaunchContextPBImpl)t).getProto(); return ((ContainerLaunchContextPBImpl)t).getProto();
} }
@ -440,6 +454,47 @@ extends ApplicationSubmissionContext {
return ((ResourcePBImpl)t).getProto(); return ((ResourcePBImpl)t).getProto();
} }
@Override
public String getNodeLabelExpression() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeLabelExpression()) {
return null;
}
return p.getNodeLabelExpression();
}
@Override
public void setNodeLabelExpression(String labelExpression) {
maybeInitBuilder();
if (labelExpression == null) {
builder.clearNodeLabelExpression();
return;
}
builder.setNodeLabelExpression(labelExpression);
}
@Override
public ResourceRequest getAMContainerResourceRequest() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.amResourceRequest != null) {
return amResourceRequest;
} // Else via proto
if (!p.hasAmContainerResourceRequest()) {
return null;
}
amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest());
return amResourceRequest;
}
@Override
public void setAMContainerResourceRequest(ResourceRequest request) {
maybeInitBuilder();
if (request == null) {
builder.clearAmContainerResourceRequest();
}
this.amResourceRequest = request;
}
@Override @Override
public long getAttemptFailuresValidityInterval() { public long getAttemptFailuresValidityInterval() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -186,4 +186,23 @@ public class ResourceRequestPBImpl extends ResourceRequest {
+ ", Location: " + getResourceName() + ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality() + "}"; + ", Relax Locality: " + getRelaxLocality() + "}";
} }
@Override
public String getNodeLabelExpression() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeLabelExpression()) {
return null;
}
return (p.getNodeLabelExpression());
}
@Override
public void setNodeLabelExpression(String nodeLabelExpression) {
maybeInitBuilder();
if (nodeLabelExpression == null) {
builder.clearNodeLabelExpression();
return;
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
} }