YARN-4335. Allow ResourceRequests to specify ExecutionType of a request ask (kkaranasos via asuresh)
(cherry picked from commit 8ffabfdf4f
)
This commit is contained in:
parent
63e5412f1a
commit
b2a654c5ee
|
@ -79,6 +79,15 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
||||||
public static ResourceRequest newInstance(Priority priority, String hostName,
|
public static ResourceRequest newInstance(Priority priority, String hostName,
|
||||||
Resource capability, int numContainers, boolean relaxLocality,
|
Resource capability, int numContainers, boolean relaxLocality,
|
||||||
String labelExpression) {
|
String labelExpression) {
|
||||||
|
return newInstance(priority, hostName, capability, numContainers,
|
||||||
|
relaxLocality, labelExpression, ExecutionType.GUARANTEED);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public static ResourceRequest newInstance(Priority priority, String hostName,
|
||||||
|
Resource capability, int numContainers, boolean relaxLocality, String
|
||||||
|
labelExpression, ExecutionType execType) {
|
||||||
ResourceRequest request = Records.newRecord(ResourceRequest.class);
|
ResourceRequest request = Records.newRecord(ResourceRequest.class);
|
||||||
request.setPriority(priority);
|
request.setPriority(priority);
|
||||||
request.setResourceName(hostName);
|
request.setResourceName(hostName);
|
||||||
|
@ -86,6 +95,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
||||||
request.setNumContainers(numContainers);
|
request.setNumContainers(numContainers);
|
||||||
request.setRelaxLocality(relaxLocality);
|
request.setRelaxLocality(relaxLocality);
|
||||||
request.setNodeLabelExpression(labelExpression);
|
request.setNodeLabelExpression(labelExpression);
|
||||||
|
request.setExecutionType(execType);
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,7 +231,28 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public abstract boolean getRelaxLocality();
|
public abstract boolean getRelaxLocality();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the <code>ExecutionType</code> of the requested container.
|
||||||
|
*
|
||||||
|
* @param execType
|
||||||
|
* ExecutionType of the requested container
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract void setExecutionType(ExecutionType execType);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get whether locality relaxation is enabled with this
|
||||||
|
* <code>ResourceRequest</code>. Defaults to true.
|
||||||
|
*
|
||||||
|
* @return whether locality relaxation is enabled with this
|
||||||
|
* <code>ResourceRequest</code>.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract ExecutionType getExecutionType();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>For a request at a network hierarchy level, set whether locality can be relaxed
|
* <p>For a request at a network hierarchy level, set whether locality can be relaxed
|
||||||
* to that level and beyond.<p>
|
* to that level and beyond.<p>
|
||||||
|
@ -322,6 +353,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
|
||||||
return false;
|
return false;
|
||||||
} else if (!priority.equals(other.getPriority()))
|
} else if (!priority.equals(other.getPriority()))
|
||||||
return false;
|
return false;
|
||||||
|
ExecutionType executionType = getExecutionType();
|
||||||
|
if (executionType == null) {
|
||||||
|
if (other.getExecutionType() != null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else if (executionType != other.getExecutionType()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (getNodeLabelExpression() == null) {
|
if (getNodeLabelExpression() == null) {
|
||||||
if (other.getNodeLabelExpression() != null) {
|
if (other.getNodeLabelExpression() != null) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -304,6 +304,7 @@ message ResourceRequestProto {
|
||||||
optional int32 num_containers = 4;
|
optional int32 num_containers = 4;
|
||||||
optional bool relax_locality = 5 [default = true];
|
optional bool relax_locality = 5 [default = true];
|
||||||
optional string node_label_expression = 6;
|
optional string node_label_expression = 6;
|
||||||
|
optional ExecutionTypeProto executionType = 7 [default = GUARANTEED];
|
||||||
}
|
}
|
||||||
|
|
||||||
enum AMCommandProto {
|
enum AMCommandProto {
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
@ -108,6 +109,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
final Priority priority;
|
final Priority priority;
|
||||||
final boolean relaxLocality;
|
final boolean relaxLocality;
|
||||||
final String nodeLabelsExpression;
|
final String nodeLabelsExpression;
|
||||||
|
final ExecutionType executionType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiates a {@link ContainerRequest} with the given constraints and
|
* Instantiates a {@link ContainerRequest} with the given constraints and
|
||||||
|
@ -152,6 +154,33 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
String[] racks, Priority priority, boolean relaxLocality) {
|
String[] racks, Priority priority, boolean relaxLocality) {
|
||||||
this(capability, nodes, racks, priority, relaxLocality, null);
|
this(capability, nodes, racks, priority, relaxLocality, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instantiates a {@link ContainerRequest} with the given constraints.
|
||||||
|
*
|
||||||
|
* @param capability
|
||||||
|
* The {@link Resource} to be requested for each container.
|
||||||
|
* @param nodes
|
||||||
|
* Any hosts to request that the containers are placed on.
|
||||||
|
* @param racks
|
||||||
|
* Any racks to request that the containers are placed on. The
|
||||||
|
* racks corresponding to any hosts requested will be automatically
|
||||||
|
* added to this list.
|
||||||
|
* @param priority
|
||||||
|
* The priority at which to request the containers. Higher
|
||||||
|
* priorities have lower numerical values.
|
||||||
|
* @param relaxLocality
|
||||||
|
* If true, containers for this request may be assigned on hosts
|
||||||
|
* and racks other than the ones explicitly requested.
|
||||||
|
* @param nodeLabelsExpression
|
||||||
|
* Set node labels to allocate resource, now we only support
|
||||||
|
* asking for only a single node label
|
||||||
|
*/
|
||||||
|
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
|
||||||
|
Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
|
||||||
|
this(capability, nodes, racks, priority, relaxLocality, null,
|
||||||
|
ExecutionType.GUARANTEED);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiates a {@link ContainerRequest} with the given constraints.
|
* Instantiates a {@link ContainerRequest} with the given constraints.
|
||||||
|
@ -173,10 +202,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
* @param nodeLabelsExpression
|
* @param nodeLabelsExpression
|
||||||
* Set node labels to allocate resource, now we only support
|
* Set node labels to allocate resource, now we only support
|
||||||
* asking for only a single node label
|
* asking for only a single node label
|
||||||
|
* @param executionType
|
||||||
|
* Set the execution type of the container request.
|
||||||
*/
|
*/
|
||||||
public ContainerRequest(Resource capability, String[] nodes,
|
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
|
||||||
String[] racks, Priority priority, boolean relaxLocality,
|
Priority priority, boolean relaxLocality, String nodeLabelsExpression,
|
||||||
String nodeLabelsExpression) {
|
ExecutionType executionType) {
|
||||||
// Validate request
|
// Validate request
|
||||||
Preconditions.checkArgument(capability != null,
|
Preconditions.checkArgument(capability != null,
|
||||||
"The Resource to be requested for each container " +
|
"The Resource to be requested for each container " +
|
||||||
|
@ -194,6 +225,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
this.priority = priority;
|
this.priority = priority;
|
||||||
this.relaxLocality = relaxLocality;
|
this.relaxLocality = relaxLocality;
|
||||||
this.nodeLabelsExpression = nodeLabelsExpression;
|
this.nodeLabelsExpression = nodeLabelsExpression;
|
||||||
|
this.executionType = executionType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Resource getCapability() {
|
public Resource getCapability() {
|
||||||
|
@ -220,10 +252,15 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
return nodeLabelsExpression;
|
return nodeLabelsExpression;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ExecutionType getExecutionType() {
|
||||||
|
return executionType;
|
||||||
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("Capability[").append(capability).append("]");
|
sb.append("Capability[").append(capability).append("]");
|
||||||
sb.append("Priority[").append(priority).append("]");
|
sb.append("Priority[").append(priority).append("]");
|
||||||
|
sb.append("ExecutionType[").append(executionType).append("]");
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
@ -206,4 +207,24 @@ public class ResourceRequestPBImpl extends ResourceRequest {
|
||||||
}
|
}
|
||||||
builder.setNodeLabelExpression(nodeLabelExpression);
|
builder.setNodeLabelExpression(nodeLabelExpression);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ExecutionType getExecutionType() {
|
||||||
|
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasExecutionType()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setExecutionType(ExecutionType execType) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (execType == null) {
|
||||||
|
builder.clearExecutionType();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue