YARN-4887. Add allocation request ID to AM-RM protocol for identifying resource-requests explicitly. (Subru Krishnan via wangda)

(cherry picked from commit eec835ec17)
This commit is contained in:
Wangda Tan 2016-06-13 21:57:33 -07:00
parent d219550e8b
commit cd3bdbc70a
5 changed files with 128 additions and 1 deletions

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -189,4 +190,48 @@ public static Container newInstance(ContainerId containerId, NodeId nodeId,
@Private @Private
@Unstable @Unstable
public abstract void setExecutionType(ExecutionType executionType); public abstract void setExecutionType(ExecutionType executionType);
/**
* Get the optional <em>ID</em> corresponding to the original {@code
* ResourceRequest{@link #getAllocationRequestId()}}s which is satisfied by
* this allocated {@code Container}.
* <p>
* The scheduler may return multiple {@code AllocateResponse}s corresponding
* to the same ID as and when scheduler allocates {@code Container}s.
* <b>Applications</b> can continue to completely ignore the returned ID in
* the response and use the allocation for any of their outstanding requests.
* <p>
*
* @return the <em>ID</em> corresponding to the original allocation request
* which is satisfied by this allocation.
*/
@Public
@Evolving
public long getAllocationRequestId() {
throw new UnsupportedOperationException();
}
/**
* Set the optional <em>ID</em> corresponding to the original {@code
* ResourceRequest{@link #setAllocationRequestId(long)}
* etAllocationRequestId()}}s which is satisfied by this allocated {@code
* Container}.
* <p>
* The scheduler may return multiple {@code AllocateResponse}s corresponding
* to the same ID as and when scheduler allocates {@code Container}s.
* <b>Applications</b> can continue to completely ignore the returned ID in
* the response and use the allocation for any of their outstanding requests.
* If the ID is not set, scheduler will continue to work as previously and all
* allocated {@code Container}(s) will have the default ID, -1.
* <p>
*
* @param allocationRequestID the <em>ID</em> corresponding to the original
* allocation request which is satisfied by this
* allocation.
*/
@Private
@Evolving
public void setAllocationRequestId(long allocationRequestID) {
throw new UnsupportedOperationException();
}
} }

View File

@ -312,6 +312,58 @@ public ExecutionTypeRequest getExecutionTypeRequest() {
@Public @Public
@Evolving @Evolving
public abstract void setNodeLabelExpression(String nodelabelExpression); public abstract void setNodeLabelExpression(String nodelabelExpression);
/**
* Get the optional <em>ID</em> corresponding to this allocation request. This
* ID is an identifier for different {@code ResourceRequest}s from the <b>same
* application</b>. The allocated {@code Container}(s) received as part of the
* {@code AllocateResponse} response will have the ID corresponding to the
* original {@code ResourceRequest} for which the RM made the allocation.
* <p>
* The scheduler may return multiple {@code AllocateResponse}s corresponding
* to the same ID as and when scheduler allocates {@code Container}(s).
* <b>Applications</b> can continue to completely ignore the returned ID in
* the response and use the allocation for any of their outstanding requests.
* <p>
* If one wishes to replace an entire {@code ResourceRequest} corresponding to
* a specific ID, they can simply cancel the corresponding {@code
* ResourceRequest} and submit a new one afresh.
*
* @return the <em>ID</em> corresponding to this allocation request.
*/
@Public
@Evolving
public long getAllocationRequestId() {
throw new UnsupportedOperationException();
}
/**
* Set the optional <em>ID</em> corresponding to this allocation request. This
* ID is an identifier for different {@code ResourceRequest}s from the <b>same
* application</b>. The allocated {@code Container}(s) received as part of the
* {@code AllocateResponse} response will have the ID corresponding to the
* original {@code ResourceRequest} for which the RM made the allocation.
* <p>
* The scheduler may return multiple {@code AllocateResponse}s corresponding
* to the same ID as and when scheduler allocates {@code Container}(s).
* <b>Applications</b> can continue to completely ignore the returned ID in
* the response and use the allocation for any of their outstanding requests.
* <p>
* If one wishes to replace an entire {@code ResourceRequest} corresponding to
* a specific ID, they can simply cancel the corresponding {@code
* ResourceRequest} and submit a new one afresh.
* <p>
* If the ID is not set, scheduler will continue to work as previously and all
* allocated {@code Container}(s) will have the default ID, -1.
*
* @param allocationRequestID the <em>ID</em> corresponding to this allocation
* request.
*/
@Public
@Evolving
public void setAllocationRequestId(long allocationRequestID) {
throw new UnsupportedOperationException();
}
@Override @Override
public int hashCode() { public int hashCode() {

View File

@ -93,6 +93,7 @@ message ContainerProto {
optional PriorityProto priority = 5; optional PriorityProto priority = 5;
optional hadoop.common.TokenProto container_token = 6; optional hadoop.common.TokenProto container_token = 6;
optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED]; optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED];
optional int64 allocation_request_id = 8 [default = -1];
} }
message ContainerReportProto { message ContainerReportProto {
@ -307,6 +308,7 @@ message ResourceRequestProto {
optional bool relax_locality = 5 [default = true]; optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6; optional string node_label_expression = 6;
optional ExecutionTypeRequestProto execution_type_request = 7; optional ExecutionTypeRequestProto execution_type_request = 7;
optional int64 allocation_request_id = 8 [default = -1];
} }
message ExecutionTypeRequestProto { message ExecutionTypeRequestProto {

View File

@ -262,6 +262,18 @@ public void setExecutionType(ExecutionType executionType) {
builder.setExecutionType(convertToProtoFormat(executionType)); builder.setExecutionType(convertToProtoFormat(executionType));
} }
@Override
public long getAllocationRequestId() {
ContainerProtoOrBuilder p = viaProto ? proto : builder;
return (p.getAllocationRequestId());
}
@Override
public void setAllocationRequestId(long allocationRequestID) {
maybeInitBuilder();
builder.setAllocationRequestId(allocationRequestID);
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p); return new ContainerIdPBImpl(p);
} }
@ -315,6 +327,8 @@ public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Container: ["); sb.append("Container: [");
sb.append("ContainerId: ").append(getId()).append(", "); sb.append("ContainerId: ").append(getId()).append(", ");
sb.append("AllocationRequestId: ").append(getAllocationRequestId())
.append(", ");
sb.append("NodeId: ").append(getNodeId()).append(", "); sb.append("NodeId: ").append(getNodeId()).append(", ");
sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", "); sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", ");
sb.append("Resource: ").append(getResource()).append(", "); sb.append("Resource: ").append(getResource()).append(", ");

View File

@ -192,6 +192,18 @@ public void setRelaxLocality(boolean relaxLocality) {
builder.setRelaxLocality(relaxLocality); builder.setRelaxLocality(relaxLocality);
} }
@Override
public long getAllocationRequestId() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.getAllocationRequestId());
}
@Override
public void setAllocationRequestId(long allocationRequestID) {
maybeInitBuilder();
builder.setAllocationRequestId(allocationRequestID);
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p); return new PriorityPBImpl(p);
} }
@ -210,7 +222,9 @@ private ResourceProto convertToProtoFormat(Resource t) {
@Override @Override
public String toString() { public String toString() {
return "{Priority: " + getPriority() + ", Capability: " + getCapability() return "{AllocationRequestId: " + getAllocationRequestId()
+ ", Priority: " + getPriority()
+ ", Capability: " + getCapability()
+ ", # Containers: " + getNumContainers() + ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName() + ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality() + ", Relax Locality: " + getRelaxLocality()