YARN-4889. Changes in AMRMClient for identifying resource-requests explicitly. (Arun Suresh via wangda)

This commit is contained in:
Wangda Tan 2016-08-26 16:48:00 -07:00
parent b930dc3ec0
commit 19c743c1bb
9 changed files with 476 additions and 120 deletions

View File

@ -111,6 +111,10 @@ public int compare(ResourceRequest r1, ResourceRequest r2) {
// Compare priority, host and capability
int ret = r1.getPriority().compareTo(r2.getPriority());
if (ret == 0) {
ret = Long.compare(
r1.getAllocationRequestId(), r2.getAllocationRequestId());
}
if (ret == 0) {
String h1 = r1.getResourceName();
String h2 = r2.getResourceName();
@ -381,6 +385,7 @@ public int hashCode() {
result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
result = prime * result + getNumContainers();
result = prime * result + ((priority == null) ? 0 : priority.hashCode());
result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode();
return result;
}
@ -422,6 +427,11 @@ public boolean equals(Object obj) {
.equals(other.getExecutionTypeRequest().getExecutionType())) {
return false;
}
if (getAllocationRequestId() != other.getAllocationRequestId()) {
return false;
}
if (getNodeLabelExpression() == null) {
if (other.getNodeLabelExpression() != null) {
return false;
@ -452,7 +462,14 @@ public int compareTo(ResourceRequest other) {
int capabilityComparison =
this.getCapability().compareTo(other.getCapability());
if (capabilityComparison == 0) {
return this.getNumContainers() - other.getNumContainers();
int numContainerComparison =
this.getNumContainers() - other.getNumContainers();
if (numContainerComparison == 0) {
return Long.compare(getAllocationRequestId(),
other.getAllocationRequestId());
} else {
return numContainerComparison;
}
} else {
return capabilityComparison;
}

View File

@ -110,6 +110,7 @@ public static class ContainerRequest {
final List<String> nodes;
final List<String> racks;
final Priority priority;
final long allocationRequestId;
final boolean relaxLocality;
final String nodeLabelsExpression;
final ExecutionTypeRequest executionTypeRequest;
@ -134,6 +135,31 @@ public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority) {
this(capability, nodes, racks, priority, true, null);
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
* locality relaxation enabled.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param allocationRequestId Allocation Request Id
*/
@Public
@InterfaceStability.Evolving
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, long allocationRequestId) {
this(capability, nodes, racks, priority, allocationRequestId, true, null,
ExecutionTypeRequest.newInstance());
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
@ -175,20 +201,20 @@ public ContainerRequest(Resource capability, String[] nodes,
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
* @param allocationRequestId Allocation Request Id
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
this(capability, nodes, racks, priority, relaxLocality,
nodeLabelsExpression,
ExecutionTypeRequest.newInstance());
@Public
@InterfaceStability.Evolving
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, long allocationRequestId,
boolean relaxLocality) {
this(capability, nodes, racks, priority, allocationRequestId,
relaxLocality, null, ExecutionTypeRequest.newInstance());
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
@ -206,11 +232,79 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
this(capability, nodes, racks, priority, 0, relaxLocality,
nodeLabelsExpression,
ExecutionTypeRequest.newInstance());
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param allocationRequestId
* The allocationRequestId of the request. To be used as a tracking
* id to match Containers allocated against this request. Will
* default to 0 if not specified.
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
*/
@Public
@InterfaceStability.Evolving
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, long allocationRequestId, boolean relaxLocality,
String nodeLabelsExpression) {
this(capability, nodes, racks, priority, allocationRequestId,
relaxLocality, nodeLabelsExpression,
ExecutionTypeRequest.newInstance());
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param allocationRequestId
* The allocationRequestId of the request. To be used as a tracking
* id to match Containers allocated against this request. Will
* default to 0 if not specified.
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
* @param executionTypeRequest
* Set the execution type of the container request.
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression,
Priority priority, long allocationRequestId, boolean relaxLocality,
String nodeLabelsExpression,
ExecutionTypeRequest executionTypeRequest) {
// Validate request
Preconditions.checkArgument(capability != null,
@ -223,6 +317,7 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
&& (nodes == null || nodes.length == 0)),
"Can't turn off locality relaxation on a " +
"request with no location constraints");
this.allocationRequestId = allocationRequestId;
this.capability = capability;
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
@ -247,6 +342,10 @@ public List<String> getRacks() {
public Priority getPriority() {
return priority;
}
public long getAllocationRequestId() {
return allocationRequestId;
}
public boolean getRelaxLocality() {
return relaxLocality;
@ -264,6 +363,7 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]");
sb.append("AllocationRequestId[").append(allocationRequestId).append("]");
sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
.append("]");
return sb.toString();
@ -390,6 +490,10 @@ public abstract void requestContainerResourceChange(
* Each collection in the list contains requests with identical
* <code>Resource</code> size that fit in the given capability. In a
* collection, requests will be returned in the same order as they were added.
*
* NOTE: This API only matches Container requests that were created by the
* client WITHOUT the allocationRequestId being set.
*
* @return Collection of request matching the parameters
*/
@InterfaceStability.Evolving
@ -407,7 +511,11 @@ public abstract List<? extends Collection<T>> getMatchingRequests(
* Each collection in the list contains requests with identical
* <code>Resource</code> size that fit in the given capability. In a
* collection, requests will be returned in the same order as they were added.
* specify an <code>ExecutionType</code> .
* specify an <code>ExecutionType</code>.
*
* NOTE: This API only matches Container requests that were created by the
* client WITHOUT the allocationRequestId being set.
*
* @param priority Priority
* @param resourceName Location
* @param executionType ExecutionType
@ -421,7 +529,23 @@ public List<? extends Collection<T>> getMatchingRequests(
throw new UnsupportedOperationException("The sub-class extending" +
" AMRMClient is expected to implement this !!");
}
/**
* Get outstanding <code>ContainerRequest</code>s matching the given
* allocationRequestId. These ContainerRequests should have been added via
* <code>addContainerRequest</code> earlier in the lifecycle. For performance,
* the AMRMClient may return its internal collection directly without creating
* a copy. Users should not perform mutable operations on the return value.
*
* NOTE: This API only matches Container requests that were created by the
* client WITH the allocationRequestId being set to a non-default value.
*
* @param allocationRequestId Allocation Request Id
* @return Collection of request matching the parameters
*/
@InterfaceStability.Evolving
public abstract Collection<T> getMatchingRequests(long allocationRequestId);
/**
* Update application's blacklist with addition or removal resources.
*

View File

@ -202,6 +202,10 @@ public abstract List<? extends Collection<T>> getMatchingRequests(
/**
* Returns all matching ContainerRequests that match the given Priority,
* ResourceName, ExecutionType and Capability.
*
* NOTE: This matches only requests that were made by the client WITHOUT the
* allocationRequestId specified.
*
* @param priority Priority.
* @param resourceName Location.
* @param executionType ExecutionType.
@ -214,6 +218,20 @@ public List<? extends Collection<T>> getMatchingRequests(
return client.getMatchingRequests(priority, resourceName,
executionType, capability);
}
/**
* Returns all matching ContainerRequests that match the given
* AllocationRequestId.
*
* NOTE: This matches only requests that were made by the client WITH the
* allocationRequestId specified.
*
* @param allocationRequestId AllocationRequestId.
* @return All matching ContainerRequests
*/
public Collection<T> getMatchingRequests(long allocationRequestId) {
return client.getMatchingRequests(allocationRequestId);
}
/**
* Registers this application master with the resource manager. On successful

View File

@ -108,10 +108,11 @@ static class ResourceRequestInfo<T> {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
ResourceRequestInfo(Priority priority, String resourceName,
Resource capability, boolean relaxLocality) {
ResourceRequestInfo(Long allocationRequestId, Priority priority,
String resourceName, Resource capability, boolean relaxLocality) {
remoteRequest = ResourceRequest.newInstance(priority, resourceName,
capability, 0);
remoteRequest.setAllocationRequestId(allocationRequestId);
remoteRequest.setRelaxLocality(relaxLocality);
containerRequests = new LinkedHashSet<T>();
}
@ -154,7 +155,8 @@ static boolean canFit(Resource arg0, Resource arg1) {
return (mem0 <= mem1 && cpu0 <= cpu1);
}
final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
new HashMap<>();
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@ -263,10 +265,12 @@ public AllocateResponse allocate(float progressIndicator)
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
askList.add(ResourceRequest.newInstance(r.getPriority(),
ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality(), r.getNodeLabelExpression(),
r.getExecutionTypeRequest()));
r.getExecutionTypeRequest());
rr.setAllocationRequestId(r.getAllocationRequestId());
askList.add(rr);
}
List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@ -318,11 +322,14 @@ public AllocateResponse allocate(float progressIndicator)
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
@SuppressWarnings("unchecked")
Iterator<ResourceRequestInfo<T>> reqIter =
remoteRequestsTable.iterator();
while (reqIter.hasNext()) {
addResourceRequestToAsk(reqIter.next().remoteRequest);
for (RemoteRequestsTable remoteRequestsTable :
remoteRequests.values()) {
@SuppressWarnings("unchecked")
Iterator<ResourceRequestInfo<T>> reqIter =
remoteRequestsTable.iterator();
while (reqIter.hasNext()) {
addResourceRequestToAsk(reqIter.next().remoteRequest);
}
}
change.putAll(this.pendingChange);
}
@ -498,15 +505,16 @@ public synchronized void addContainerRequest(T req) {
// check that specific and non-specific requests cannot be mixed within a
// priority
checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
req.getRelaxLocality());
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
req.getPriority(), ANY_LIST, req.getRelaxLocality());
// check that specific rack cannot be mixed with specific node within a
// priority. If node and its rack are both specified then they must be
// in the same request.
// For explicitly requested racks, we set locality relaxation to true
checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
req.getRelaxLocality());
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
req.getPriority(), dedupedRacks, true);
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
req.getPriority(), inferredRacks, req.getRelaxLocality());
// check if the node label expression specified is valid
checkNodeLabelExpression(req);
@ -607,6 +615,24 @@ public synchronized int getClusterNodeCount() {
return clusterNodeCount;
}
@Override
@SuppressWarnings("unchecked")
public Collection<T> getMatchingRequests(long allocationRequestId) {
RemoteRequestsTable remoteRequestsTable = getTable(allocationRequestId);
LinkedHashSet<T> list = new LinkedHashSet<>();
if (remoteRequestsTable != null) {
Iterator<ResourceRequestInfo<T>> reqIter =
remoteRequestsTable.iterator();
while (reqIter.hasNext()) {
ResourceRequestInfo<T> resReqInfo = reqIter.next();
list.addAll(resReqInfo.containerRequests);
}
}
return list;
}
@Override
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority,
@ -617,6 +643,7 @@ public synchronized List<? extends Collection<T>> getMatchingRequests(
}
@Override
@SuppressWarnings("unchecked")
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
@ -626,9 +653,9 @@ public synchronized List<? extends Collection<T>> getMatchingRequests(
"The priority at which to request containers should not be null ");
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
@SuppressWarnings("unchecked")
RemoteRequestsTable remoteRequestsTable = getTable(0);
List<ResourceRequestInfo<T>> matchingRequests =
this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
remoteRequestsTable.getMatchingRequests(priority, resourceName,
executionType, capability);
// If no exact match. Container may be larger than what was requested.
// get all resources <= capability. map is reverse sorted.
@ -664,23 +691,26 @@ private Set<String> resolveRacks(List<String> nodes) {
* ContainerRequests with locality relaxation cannot be made at the same
* priority as ContainerRequests without locality relaxation.
*/
private void checkLocalityRelaxationConflict(Priority priority,
Collection<String> locations, boolean relaxLocality) {
private void checkLocalityRelaxationConflict(Long allocationReqId,
Priority priority, Collection<String> locations, boolean relaxLocality) {
// Locality relaxation will be set to relaxLocality for all implicitly
// requested racks. Make sure that existing rack requests match this.
@SuppressWarnings("unchecked")
List<ResourceRequestInfo> allCapabilityMaps =
remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
for (ResourceRequestInfo reqs : allCapabilityMaps) {
ResourceRequest remoteRequest = reqs.remoteRequest;
boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
if (relaxLocality != existingRelaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location "
+ remoteRequest.getResourceName() + " with locality relaxation "
+ relaxLocality + " when it has already been requested"
+ "with locality relaxation " + existingRelaxLocality);
RemoteRequestsTable<T> remoteRequestsTable = getTable(allocationReqId);
if (remoteRequestsTable != null) {
@SuppressWarnings("unchecked")
List<ResourceRequestInfo> allCapabilityMaps =
remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
for (ResourceRequestInfo reqs : allCapabilityMaps) {
ResourceRequest remoteRequest = reqs.remoteRequest;
boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
if (relaxLocality != existingRelaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location "
+ remoteRequest.getResourceName() + " with locality relaxation "
+ relaxLocality + " when it has already been requested"
+ "with locality relaxation " + existingRelaxLocality);
}
}
}
}
@ -742,10 +772,17 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
private void addResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req,
boolean relaxLocality, String labelExpression) {
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable == null) {
remoteRequestsTable = new RemoteRequestsTable<T>();
putTable(req.getAllocationRequestId(), remoteRequestsTable);
}
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
.addResourceRequest(priority, resourceName,
execTypeReq, capability, req, relaxLocality, labelExpression);
.addResourceRequest(req.getAllocationRequestId(), priority,
resourceName, execTypeReq, capability, req, relaxLocality,
labelExpression);
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
@ -761,29 +798,37 @@ private void addResourceRequest(Priority priority, String resourceName,
private void decResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo =
remoteRequestsTable.decResourceRequest(priority, resourceName,
execTypeReq, capability, req);
// send the ResourceRequest to RM even if is 0 because it needs to override
// a previously sent value. If ResourceRequest was not sent previously then
// sending 0 aught to be a no-op on RM
if (resourceRequestInfo != null) {
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable != null) {
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo =
remoteRequestsTable.decResourceRequest(priority, resourceName,
execTypeReq, capability, req);
// send the ResourceRequest to RM even if is 0 because it needs to
// override a previously sent value. If ResourceRequest was not sent
// previously then sending 0 ought to be a no-op on RM
if (resourceRequestInfo != null) {
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
// delete entry from map if no longer needed
if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
this.remoteRequestsTable.remove(priority, resourceName,
execTypeReq.getExecutionType(), capability);
}
// delete entry from map if no longer needed
if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
remoteRequestsTable.remove(priority, resourceName,
execTypeReq.getExecutionType(), capability);
}
if (LOG.isDebugEnabled()) {
LOG.debug("AFTER decResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
if (LOG.isDebugEnabled()) {
LOG.debug("AFTER decResourceRequest:"
+ " allocationRequestId=" + req.getAllocationRequestId()
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
}
}
} else {
LOG.info("No remoteRequestTable found with allocationRequestId="
+ req.getAllocationRequestId());
}
}
@ -829,4 +874,14 @@ private void updateAMRMToken(Token token) throws IOException {
currentUGI.addToken(amrmToken);
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
}
@VisibleForTesting
RemoteRequestsTable<T> getTable(long allocationRequestId) {
return remoteRequests.get(Long.valueOf(allocationRequestId));
}
RemoteRequestsTable<T> putTable(long allocationRequestId,
RemoteRequestsTable<T> table) {
return remoteRequests.put(Long.valueOf(allocationRequestId), table);
}
}

View File

@ -264,15 +264,16 @@ List<ResourceRequestInfo> getMatchingRequests(
}
@SuppressWarnings("unchecked")
ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req,
boolean relaxLocality, String labelExpression) {
ResourceRequestInfo addResourceRequest(Long allocationRequestId,
Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,
Resource capability, T req, boolean relaxLocality,
String labelExpression) {
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
execTypeReq.getExecutionType(), capability);
if (resourceRequestInfo == null) {
resourceRequestInfo =
new ResourceRequestInfo(priority, resourceName, capability,
relaxLocality);
new ResourceRequestInfo(allocationRequestId, priority, resourceName,
capability, relaxLocality);
put(priority, resourceName, execTypeReq.getExecutionType(), capability,
resourceRequestInfo);
}

View File

@ -33,6 +33,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@ -268,6 +269,18 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
amClient.addContainerRequest(storedContainer5);
amClient.addContainerRequest(storedContainer6);
amClient.addContainerRequest(storedContainer7);
// Add some CRs with allocReqIds... These will not be returned by
// the default getMatchingRequests
ContainerRequest storedContainer11 =
new ContainerRequest(capability1, nodes, racks, priority, 1);
ContainerRequest storedContainer33 =
new ContainerRequest(capability3, nodes, racks, priority, 3);
ContainerRequest storedContainer43 =
new ContainerRequest(capability4, nodes, racks, priority, 3);
amClient.addContainerRequest(storedContainer11);
amClient.addContainerRequest(storedContainer33);
amClient.addContainerRequest(storedContainer43);
// test matching of containers
List<? extends Collection<ContainerRequest>> matches;
@ -279,6 +292,25 @@ public void testAMRMClientMatchingFit() throws YarnException, IOException {
storedRequest = matches.get(0).iterator().next();
assertEquals(storedContainer1, storedRequest);
amClient.removeContainerRequest(storedContainer1);
// exact match for allocReqId 1
Collection<ContainerRequest> reqIdMatches =
amClient.getMatchingRequests(1);
assertEquals(1, reqIdMatches.size());
storedRequest = reqIdMatches.iterator().next();
assertEquals(storedContainer11, storedRequest);
amClient.removeContainerRequest(storedContainer11);
// exact match for allocReqId 3
reqIdMatches = amClient.getMatchingRequests(3);
assertEquals(2, reqIdMatches.size());
Iterator<ContainerRequest> iter = reqIdMatches.iterator();
storedRequest = iter.next();
assertEquals(storedContainer43, storedRequest);
amClient.removeContainerRequest(storedContainer43);
storedRequest = iter.next();
assertEquals(storedContainer33, storedRequest);
amClient.removeContainerRequest(storedContainer33);
// exact matching with order maintained
Resource testCapability2 = Resource.newInstance(2000, 1);
@ -364,26 +396,32 @@ public void testAMRMClientMatchingFitExecType()
ContainerRequest storedGuarContainer2 =
new ContainerRequest(capability2, nodes, racks, priority);
ContainerRequest storedOpportContainer1 =
new ContainerRequest(capability1, nodes, racks, priority, true, null,
new ContainerRequest(capability1, nodes, racks, priority,
0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer2 =
new ContainerRequest(capability2, nodes, racks, priority, true, null,
new ContainerRequest(capability2, nodes, racks, priority,
0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer3 =
new ContainerRequest(capability3, nodes, racks, priority, true, null,
new ContainerRequest(capability3, nodes, racks, priority,
0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer4 =
new ContainerRequest(capability4, nodes, racks, priority, true, null,
new ContainerRequest(capability4, nodes, racks, priority,
0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer5 =
new ContainerRequest(capability5, nodes, racks, priority, true, null,
new ContainerRequest(capability5, nodes, racks, priority,
0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer6 =
new ContainerRequest(capability6, nodes, racks, priority, true, null,
new ContainerRequest(capability6, nodes, racks, priority,
0, true, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
ContainerRequest storedOpportContainer7 =
new ContainerRequest(capability7, nodes, racks, priority2,
false, null,
0, false, null,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
amClient.addContainerRequest(storedGuarContainer1);
amClient.addContainerRequest(storedGuarContainer2);
@ -541,11 +579,13 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
amClient.addContainerRequest(storedContainer3);
// test addition and storage
int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(0);
int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(2, containersRequestedAny);
containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
containersRequestedAny = remoteRequestsTable.get(priority1,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(1, containersRequestedAny);
@ -584,7 +624,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
assertTrue(matches.isEmpty());
// 0 requests left. everything got cleaned up
assertTrue(amClient.remoteRequestsTable.isEmpty());
assertTrue(amClient.getTable(0).isEmpty());
// go through an exemplary allocation, matching and release cycle
amClient.addContainerRequest(storedContainer1);
@ -628,7 +668,7 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
assertEquals(0, amClient.ask.size());
assertEquals(0, allocResponse.getAllocatedContainers().size());
// 0 requests left. everything got cleaned up
assertTrue(amClient.remoteRequestsTable.isEmpty());
assertTrue(remoteRequestsTable.isEmpty());
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
@ -780,6 +820,16 @@ private int getAllocatedContainersNumber(
@Test (timeout=60000)
public void testAMRMClient() throws YarnException, IOException {
initAMRMClientAndTest(false);
}
@Test (timeout=60000)
public void testAMRMClientAllocReqId() throws YarnException, IOException {
initAMRMClientAndTest(true);
}
private void initAMRMClientAndTest(boolean useAllocReqId)
throws YarnException, IOException {
AMRMClient<ContainerRequest> amClient = null;
try {
// start am rm client
@ -796,7 +846,11 @@ public void testAMRMClient() throws YarnException, IOException {
amClient.registerApplicationMaster("Host", 10000, "");
testAllocation((AMRMClientImpl<ContainerRequest>)amClient);
if (useAllocReqId) {
testAllocRequestId((AMRMClientImpl<ContainerRequest>)amClient);
} else {
testAllocation((AMRMClientImpl<ContainerRequest>) amClient);
}
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
@ -1055,22 +1109,9 @@ private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
assertEquals(2, containersRequestedAny);
assertEquals(3, amClient.ask.size());
assertEquals(0, amClient.release.size());
assertNumContainers(amClient, 0, 2, 2, 2, 3, 0);
int containersRequestedAny = 2;
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
@ -1163,10 +1204,15 @@ public AllocateResponse answer(InvocationOnMock invocation)
// verify that the remove request made in between makeRequest and allocate
// has not been lost
assertEquals(0, snoopRequest.getNumContainers());
iterationsLeft = 3;
waitForContainerCompletion(3, amClient, releases);
}
private void waitForContainerCompletion(int numIterations,
AMRMClientImpl<ContainerRequest> amClient, Set<ContainerId> releases)
throws YarnException, IOException {
// do a few iterations to ensure RM is not going send new containers
while(!releases.isEmpty() || iterationsLeft-- > 0) {
while(!releases.isEmpty() || numIterations-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
@ -1181,7 +1227,7 @@ public AllocateResponse answer(InvocationOnMock invocation)
}
}
}
if(iterationsLeft > 0) {
if(numIterations > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
@ -1190,6 +1236,98 @@ public AllocateResponse answer(InvocationOnMock invocation)
assertEquals(0, amClient.release.size());
}
private void testAllocRequestId(
final AMRMClientImpl<ContainerRequest> amClient) throws YarnException,
IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 1));
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 1));
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 2));
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 2));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 1));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority, 2));
assertNumContainers(amClient, 0, 1, 1, 1, 9, 0);
assertNumContainers(amClient, 1, 1, 1, 1, 9, 0);
assertNumContainers(amClient, 2, 1, 1, 1, 9, 0);
int containersRequestedAny = 3;
// RM should allocate container within 2 calls to allocate()
List<Container> allocatedContainers = new ArrayList<>();
int iterationsLeft = 5;
Set<ContainerId> releases = new TreeSet<ContainerId>();
while (allocatedContainers.size() < containersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
amClient.releaseAssignedContainer(rejectContainerId);
}
if(allocatedContainers.size() < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(containersRequestedAny, allocatedContainers.size());
Set<Long> expAllocIds = new HashSet<>(
Arrays.asList(Long.valueOf(0), Long.valueOf(1), Long.valueOf(2)));
Set<Long> actAllocIds = new HashSet<>();
for (Container ac : allocatedContainers) {
actAllocIds.add(Long.valueOf(ac.getAllocationRequestId()));
}
assertEquals(expAllocIds, actAllocIds);
assertEquals(3, amClient.release.size());
assertEquals(0, amClient.ask.size());
waitForContainerCompletion(3, amClient, releases);
}
private void assertNumContainers(AMRMClientImpl<ContainerRequest> amClient,
long allocationReqId, int expNode, int expRack, int expAny,
int expAsks, int expRelease) {
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(allocationReqId);
int containersRequestedNode = remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(expNode, containersRequestedNode);
assertEquals(expRack, containersRequestedRack);
assertEquals(expAny, containersRequestedAny);
assertEquals(expAsks, amClient.ask.size());
assertEquals(expRelease, amClient.release.size());
}
class CountDownSupplier implements Supplier<Boolean> {
int counter = 0;
@Override

View File

@ -61,7 +61,7 @@ public void testOpportunisticAndGuaranteedRequests() {
verifyResourceRequest(client, request, ResourceRequest.ANY, true);
ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1), true, null,
new String[] {"/rack2"}, Priority.newInstance(1), 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
client.addContainerRequest(request2);
@ -274,8 +274,9 @@ private void verifyResourceRequest(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location, boolean expectedRelaxLocality,
ExecutionType executionType) {
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
location, executionType, request.getCapability()).remoteRequest;
ResourceRequest ask = client.getTable(0)
.get(request.getPriority(), location, executionType,
request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

View File

@ -366,12 +366,12 @@ public void testAMRMClient() throws Exception {
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
@ -381,21 +381,23 @@ public void testAMRMClient() throws Exception {
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(0);
int containersRequestedNode = remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
int containersRequestedRack = remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
remoteRequestsTable.get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
@ -457,7 +459,7 @@ public void testAMRMClient() throws Exception {
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
true, null,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
assertEquals(4, amClient.ask.size());
@ -469,7 +471,7 @@ public void testAMRMClient() throws Exception {
nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
true, null,
0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
@ -490,7 +492,7 @@ public AllocateResponse answer(InvocationOnMock invocation)
priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null,
priority2, true, null,
priority2, 0, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
throw new Exception();
@ -571,7 +573,7 @@ public void testAMOpportunistic() throws Exception {
ExecutionTypeRequest execTypeRequest =
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
capability, nodes, racks, priority, true, null, execTypeRequest);
capability, nodes, racks, priority, 0, true, null, execTypeRequest);
amClient.addContainerRequest(containerRequest);
// Wait until the container is allocated

View File

@ -252,9 +252,9 @@ private Set<Container> allocateContainers(
racks, priority));
}
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
int containersRequestedAny = rmClient.getTable(0)
.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
capability).remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;