YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh)

(cherry picked from commit 5143277958)
This commit is contained in:
Arun Suresh 2016-06-12 09:42:38 -07:00
parent 5985221b46
commit fccb641942
11 changed files with 1191 additions and 573 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -109,7 +110,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
final Priority priority;
final boolean relaxLocality;
final String nodeLabelsExpression;
final ExecutionType executionType;
final ExecutionTypeRequest executionTypeRequest;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
@ -180,7 +181,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
this(capability, nodes, racks, priority, relaxLocality,
nodeLabelsExpression,
ExecutionType.GUARANTEED);
ExecutionTypeRequest.newInstance());
}
/**
@ -203,12 +204,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
* @param executionType
* @param executionTypeRequest
* Set the execution type of the container request.
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, boolean relaxLocality, String nodeLabelsExpression,
ExecutionType executionType) {
ExecutionTypeRequest executionTypeRequest) {
// Validate request
Preconditions.checkArgument(capability != null,
"The Resource to be requested for each container " +
@ -226,7 +227,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
this.priority = priority;
this.relaxLocality = relaxLocality;
this.nodeLabelsExpression = nodeLabelsExpression;
this.executionType = executionType;
this.executionTypeRequest = executionTypeRequest;
}
public Resource getCapability() {
@ -253,15 +254,16 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
return nodeLabelsExpression;
}
public ExecutionType getExecutionType() {
return executionType;
public ExecutionTypeRequest getExecutionTypeRequest() {
return executionTypeRequest;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]");
sb.append("ExecutionType[").append(executionType).append("]");
sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
.append("]");
return sb.toString();
}
}
@ -388,11 +390,36 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
* collection, requests will be returned in the same order as they were added.
* @return Collection of request matching the parameters
*/
@InterfaceStability.Evolving
public abstract List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability);
/**
* Get outstanding <code>ContainerRequest</code>s matching the given
* parameters. These ContainerRequests should have been added via
* <code>addContainerRequest</code> earlier in the lifecycle. For performance,
* the AMRMClient may return its internal collection directly without creating
* a copy. Users should not perform mutable operations on the return value.
* Each collection in the list contains requests with identical
* <code>Resource</code> size that fit in the given capability. In a
* collection, requests will be returned in the same order as they were added.
* specify an <code>ExecutionType</code> .
* @param priority Priority
* @param resourceName Location
* @param executionType ExecutionType
* @param capability Capability
* @return Collection of request matching the parameters
*/
@InterfaceStability.Evolving
public List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
throw new UnsupportedOperationException("The sub-class extending" +
" AMRMClient is expected to implement this !!");
}
/**
* Update application's blacklist with addition or removal resources.
*

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
@ -197,6 +198,22 @@ extends AbstractService {
String resourceName,
Resource capability);
/**
* Returns all matching ContainerRequests that match the given Priority,
* ResourceName, ExecutionType and Capability.
* @param priority Priority.
* @param resourceName Location.
* @param executionType ExecutionType.
* @param capability Capability.
* @return All matching ContainerRequests
*/
public List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
return client.getMatchingRequests(priority, resourceName,
executionType, capability);
}
/**
* Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread.

View File

@ -19,19 +19,19 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.AbstractMap.SimpleEntry;
@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
@ -102,7 +104,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
class ResourceRequestInfo {
static class ResourceRequestInfo<T> {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
@ -115,11 +117,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
}
/**
* Class compares Resource by memory then cpu in reverse order
*/
class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
static class ResourceReverseMemoryThenCpuComparator implements
Comparator<Resource>, Serializable {
static final long serialVersionUID = 12345L;
@Override
public int compare(Resource arg0, Resource arg1) {
long mem0 = arg0.getMemorySize();
@ -151,16 +154,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
return (mem0 <= mem1 && cpu0 <= cpu1);
}
//Key -> Priority
//Value -> Map
//Key->ResourceName (e.g., nodename, rackname, *)
//Value->Map
//Key->Resource Capability
//Value->ResourceRequest
protected final
Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
remoteRequestsTable =
new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@ -185,6 +179,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
super(AMRMClientImpl.class.getName());
}
@VisibleForTesting
AMRMClientImpl(ApplicationMasterProtocol protocol) {
super(AMRMClientImpl.class.getName());
this.rmClient = protocol;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
RackResolver.init(conf);
@ -195,8 +195,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected void serviceStart() throws Exception {
final YarnConfiguration conf = new YarnConfiguration(getConfig());
try {
rmClient =
ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
if (rmClient == null) {
rmClient = ClientRMProxy.createRMProxy(
conf, ApplicationMasterProtocol.class);
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
@ -263,7 +265,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// RPC layer is using it to send info across
askList.add(ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality(), r.getNodeLabelExpression()));
r.getRelaxLocality(), r.getNodeLabelExpression(),
r.getExecutionTypeRequest()));
}
List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@ -315,13 +318,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
.values()) {
for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
for (ResourceRequestInfo request : capabalities.values()) {
addResourceRequestToAsk(request.remoteRequest);
}
}
@SuppressWarnings("unchecked")
Iterator<ResourceRequestInfo<T>> reqIter =
remoteRequestsTable.iterator();
while (reqIter.hasNext()) {
addResourceRequestToAsk(reqIter.next().remoteRequest);
}
change.putAll(this.pendingChange);
}
@ -517,26 +518,28 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+ joiner.join(req.getNodes()));
}
for (String node : dedupedNodes) {
addResourceRequest(req.getPriority(), node, req.getCapability(), req,
true, req.getNodeLabelExpression());
addResourceRequest(req.getPriority(), node,
req.getExecutionTypeRequest(), req.getCapability(), req, true,
req.getNodeLabelExpression());
}
}
for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
true, req.getNodeLabelExpression());
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
req.getCapability(), req, true, req.getNodeLabelExpression());
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
req.getRelaxLocality(), req.getNodeLabelExpression());
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
req.getCapability(), req, req.getRelaxLocality(),
req.getNodeLabelExpression());
}
// Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression());
req.getExecutionTypeRequest(), req.getCapability(), req,
req.getRelaxLocality(), req.getNodeLabelExpression());
}
@Override
@ -552,16 +555,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// Update resource requests
if (req.getNodes() != null) {
for (String node : new HashSet<String>(req.getNodes())) {
decResourceRequest(req.getPriority(), node, req.getCapability(), req);
decResourceRequest(req.getPriority(), node,
req.getExecutionTypeRequest(), req.getCapability(), req);
}
}
for (String rack : allRacks) {
decResourceRequest(req.getPriority(), rack, req.getCapability(), req);
decResourceRequest(req.getPriority(), rack,
req.getExecutionTypeRequest(), req.getCapability(), req);
}
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getCapability(), req);
req.getExecutionTypeRequest(), req.getCapability(), req);
}
@Override
@ -604,44 +609,35 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
@Override
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability) {
Priority priority,
String resourceName,
Resource capability) {
return getMatchingRequests(priority, resourceName,
ExecutionType.GUARANTEED, capability);
}
@Override
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
Preconditions.checkArgument(capability != null,
"The Resource to be requested should not be null ");
Preconditions.checkArgument(priority != null,
"The priority at which to request containers should not be null ");
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
return list;
}
TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
.get(resourceName);
if (reqMap == null) {
return list;
}
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo != null &&
!resourceRequestInfo.containerRequests.isEmpty()) {
list.add(resourceRequestInfo.containerRequests);
return list;
}
// no exact match. Container may be larger than what was requested.
@SuppressWarnings("unchecked")
List<ResourceRequestInfo<T>> matchingRequests =
this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
executionType, capability);
// If no exact match. Container may be larger than what was requested.
// get all resources <= capability. map is reverse sorted.
SortedMap<Resource, ResourceRequestInfo> tailMap =
reqMap.tailMap(capability);
for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
if (canFit(entry.getKey(), capability) &&
!entry.getValue().containerRequests.isEmpty()) {
// match found that fits in the larger resource
list.add(entry.getValue().containerRequests);
for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
!resReqInfo.containerRequests.isEmpty()) {
list.add(resReqInfo.containerRequests);
}
}
// no match found
return list;
}
@ -670,27 +666,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
*/
private void checkLocalityRelaxationConflict(Priority priority,
Collection<String> locations, boolean relaxLocality) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
return;
}
// Locality relaxation will be set to relaxLocality for all implicitly
// requested racks. Make sure that existing rack requests match this.
for (String location : locations) {
TreeMap<Resource, ResourceRequestInfo> reqs =
remoteRequests.get(location);
if (reqs != null && !reqs.isEmpty()) {
boolean existingRelaxLocality =
reqs.values().iterator().next().remoteRequest.getRelaxLocality();
if (relaxLocality != existingRelaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location " + location
+ " with locality relaxation " + relaxLocality + " when it has "
+ "already been requested with locality relaxation " + existingRelaxLocality);
}
}
@SuppressWarnings("unchecked")
List<ResourceRequestInfo> allCapabilityMaps =
remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
for (ResourceRequestInfo reqs : allCapabilityMaps) {
ResourceRequest remoteRequest = reqs.remoteRequest;
boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
if (relaxLocality != existingRelaxLocality) {
throw new InvalidContainerRequestException("Cannot submit a "
+ "ContainerRequest asking for location "
+ remoteRequest.getResourceName() + " with locality relaxation "
+ relaxLocality + " when it has already been requested"
+ "with locality relaxation " + existingRelaxLocality);
}
}
}
/**
@ -747,46 +739,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ask.add(remoteRequest);
}
private void
addResourceRequest(Priority priority, String resourceName,
Resource capability, T req, boolean relaxLocality,
String labelExpression) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
remoteRequests =
new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
this.remoteRequestsTable.put(priority, remoteRequests);
if (LOG.isDebugEnabled()) {
LOG.debug("Added priority=" + priority);
}
}
TreeMap<Resource, ResourceRequestInfo> reqMap =
remoteRequests.get(resourceName);
if (reqMap == null) {
// capabilities are stored in reverse sorted order. smallest last.
reqMap = new TreeMap<Resource, ResourceRequestInfo>(
new ResourceReverseMemoryThenCpuComparator());
remoteRequests.put(resourceName, reqMap);
}
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo == null) {
resourceRequestInfo =
new ResourceRequestInfo(priority, resourceName, capability,
relaxLocality);
reqMap.put(capability, resourceRequestInfo);
}
resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() + 1);
if (relaxLocality) {
resourceRequestInfo.containerRequests.add(req);
}
if (ResourceRequest.ANY.equals(resourceName)) {
resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
}
private void addResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req,
boolean relaxLocality, String labelExpression) {
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
.addResourceRequest(priority, resourceName,
execTypeReq, capability, req, relaxLocality, labelExpression);
// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
@ -800,70 +759,31 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
}
}
private void decResourceRequest(Priority priority,
String resourceName,
Resource capability,
T req) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if(remoteRequests == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as priority " + priority
+ " is not present in request table");
}
return;
}
Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as " + resourceName
+ " is not present in request table");
}
return;
}
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (LOG.isDebugEnabled()) {
LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
}
resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() - 1);
resourceRequestInfo.containerRequests.remove(req);
if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
// guard against spurious removals
resourceRequestInfo.remoteRequest.setNumContainers(0);
}
private void decResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
@SuppressWarnings("unchecked")
ResourceRequestInfo resourceRequestInfo =
remoteRequestsTable.decResourceRequest(priority, resourceName,
execTypeReq, capability, req);
// send the ResourceRequest to RM even if is 0 because it needs to override
// a previously sent value. If ResourceRequest was not sent previously then
// sending 0 aught to be a no-op on RM
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
if (resourceRequestInfo != null) {
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
// delete entries from map if no longer needed
if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
reqMap.remove(capability);
if (reqMap.size() == 0) {
remoteRequests.remove(resourceName);
// delete entry from map if no longer needed
if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
this.remoteRequestsTable.remove(priority, resourceName,
execTypeReq.getExecutionType(), capability);
}
if (remoteRequests.size() == 0) {
remoteRequestsTable.remove(priority);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("AFTER decResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
if (LOG.isDebugEnabled()) {
LOG.debug("AFTER decResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
}
}
}

View File

@ -0,0 +1,332 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
static ResourceReverseMemoryThenCpuComparator resourceComparator =
new ResourceReverseMemoryThenCpuComparator();
/**
* Nested Iterator that iterates over just the ResourceRequestInfo
* object.
*/
class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>>> iLocMap;
private Iterator<Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>> iExecTypeMap;
private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
private Iterator<ResourceRequestInfo> iResReqInfo;
public RequestInfoIterator(Iterator<Map<String,
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
iLocationMap) {
this.iLocMap = iLocationMap;
if (iLocMap.hasNext()) {
iExecTypeMap = iLocMap.next().values().iterator();
} else {
iExecTypeMap =
new LinkedList<Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>>().iterator();
}
if (iExecTypeMap.hasNext()) {
iCapMap = iExecTypeMap.next().values().iterator();
} else {
iCapMap =
new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
.iterator();
}
if (iCapMap.hasNext()) {
iResReqInfo = iCapMap.next().values().iterator();
} else {
iResReqInfo = new LinkedList<ResourceRequestInfo>().iterator();
}
}
@Override
public boolean hasNext() {
return iLocMap.hasNext()
|| iExecTypeMap.hasNext()
|| iCapMap.hasNext()
|| iResReqInfo.hasNext();
}
@Override
public ResourceRequestInfo next() {
if (!iResReqInfo.hasNext()) {
if (!iCapMap.hasNext()) {
if (!iExecTypeMap.hasNext()) {
iExecTypeMap = iLocMap.next().values().iterator();
}
iCapMap = iExecTypeMap.next().values().iterator();
}
iResReqInfo = iCapMap.next().values().iterator();
}
return iResReqInfo.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported" +
"for this iterator !!");
}
}
// Nest map with Primary key :
// Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
// and value : ResourceRequestInfo
private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
@Override
public Iterator<ResourceRequestInfo> iterator() {
return new RequestInfoIterator(remoteRequestsTable.values().iterator());
}
ResourceRequestInfo get(Priority priority, String location,
ExecutionType execType, Resource capability) {
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, location, execType);
if (capabilityMap == null) {
return null;
}
return capabilityMap.get(capability);
}
void put(Priority priority, String resourceName, ExecutionType execType,
Resource capability, ResourceRequestInfo resReqInfo) {
Map<String, Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>> locationMap =
remoteRequestsTable.get(priority);
if (locationMap == null) {
locationMap = new HashMap<>();
this.remoteRequestsTable.put(priority, locationMap);
if (LOG.isDebugEnabled()) {
LOG.debug("Added priority=" + priority);
}
}
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
locationMap.get(resourceName);
if (execTypeMap == null) {
execTypeMap = new HashMap<>();
locationMap.put(resourceName, execTypeMap);
if (LOG.isDebugEnabled()) {
LOG.debug("Added resourceName=" + resourceName);
}
}
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
execTypeMap.get(execType);
if (capabilityMap == null) {
capabilityMap = new TreeMap<>(resourceComparator);
execTypeMap.put(execType, capabilityMap);
if (LOG.isDebugEnabled()) {
LOG.debug("Added Execution Type=" + execType);
}
}
capabilityMap.put(capability, resReqInfo);
}
ResourceRequestInfo remove(Priority priority, String resourceName,
ExecutionType execType, Resource capability) {
ResourceRequestInfo retVal = null;
Map<String, Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
if (locationMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No such priority=" + priority);
}
return null;
}
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
execTypeMap = locationMap.get(resourceName);
if (execTypeMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No such resourceName=" + resourceName);
}
return null;
}
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
execTypeMap.get(execType);
if (capabilityMap == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No such Execution Type=" + execType);
}
return null;
}
retVal = capabilityMap.remove(capability);
if (capabilityMap.size() == 0) {
execTypeMap.remove(execType);
if (execTypeMap.size() == 0) {
locationMap.remove(resourceName);
if (locationMap.size() == 0) {
this.remoteRequestsTable.remove(priority);
}
}
}
return retVal;
}
Map<String, Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>> getLocationMap(Priority priority) {
return remoteRequestsTable.get(priority);
}
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
getExecutionTypeMap(Priority priority, String location) {
Map<String, Map<ExecutionType, TreeMap<Resource,
ResourceRequestInfo>>> locationMap = getLocationMap(priority);
if (locationMap == null) {
return null;
}
return locationMap.get(location);
}
TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
priority, String location,
ExecutionType execType) {
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
executionTypeMap = getExecutionTypeMap(priority, location);
if (executionTypeMap == null) {
return null;
}
return executionTypeMap.get(execType);
}
@SuppressWarnings("unchecked")
List<ResourceRequestInfo> getAllResourceRequestInfos(Priority priority,
Collection<String> locations) {
List retList = new LinkedList<>();
for (String location : locations) {
for (ExecutionType eType : ExecutionType.values()) {
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, location, eType);
if (capabilityMap != null) {
retList.addAll(capabilityMap.values());
}
}
}
return retList;
}
List<ResourceRequestInfo> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
List<ResourceRequestInfo> list = new LinkedList<>();
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, resourceName, executionType);
if (capabilityMap != null) {
ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
if (resourceRequestInfo != null) {
list.add(resourceRequestInfo);
} else {
list.addAll(capabilityMap.tailMap(capability).values());
}
}
return list;
}
@SuppressWarnings("unchecked")
ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req,
boolean relaxLocality, String labelExpression) {
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
execTypeReq.getExecutionType(), capability);
if (resourceRequestInfo == null) {
resourceRequestInfo =
new ResourceRequestInfo(priority, resourceName, capability,
relaxLocality);
put(priority, resourceName, execTypeReq.getExecutionType(), capability,
resourceRequestInfo);
}
resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq);
resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() + 1);
if (relaxLocality) {
resourceRequestInfo.containerRequests.add(req);
}
if (ResourceRequest.ANY.equals(resourceName)) {
resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
}
return resourceRequestInfo;
}
ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
execTypeReq.getExecutionType(), capability);
if (resourceRequestInfo == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as ResourceRequestInfo with" +
"priority=" + priority + ", " +
"resourceName=" + resourceName + ", " +
"executionType=" + execTypeReq + ", " +
"capability=" + capability + " is not present in request table");
}
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ resourceRequestInfo.remoteRequest.getNumContainers());
}
resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() - 1);
resourceRequestInfo.containerRequests.remove(req);
if (resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
// guard against spurious removals
resourceRequestInfo.remoteRequest.setNumContainers(0);
}
return resourceRequestInfo;
}
boolean isEmpty() {
return remoteRequestsTable.isEmpty();
}
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
/**
* Base test case to be used for Testing frameworks that use AMRMProxy.
*/
public abstract class BaseAMRMProxyE2ETest {
protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
ApplicationId appId, MiniYARNCluster cluster,
final Configuration yarnConf)
throws IOException, InterruptedException, YarnException {
UserGroupInformation user = null;
// Get the AMRMToken from AMRMProxy
ApplicationReport report = rmClient.getApplicationReport(appId);
user = UserGroupInformation.createProxyUser(
report.getCurrentApplicationAttemptId().toString(),
UserGroupInformation.getCurrentUser());
ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
.getNodeManager(0).getNMContext().getContainerManager();
AMRMProxyTokenSecretManager amrmTokenSecretManager =
containerManager.getAMRMProxyService().getSecretManager();
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
amrmTokenSecretManager
.createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
SecurityUtil.setTokenService(token,
containerManager.getAMRMProxyService().getBindAddress());
user.addToken(token);
// Start Application Master
return user
.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() throws Exception {
return ClientRMProxy.createRMProxy(yarnConf,
ApplicationMasterProtocol.class);
}
});
}
protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
// The test needs AMRMClient to create a real allocate request
AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
new AMRMClientImpl<>();
Resource capability = Resource.newInstance(1024, 2);
Priority priority = Priority.newInstance(1);
List<NodeReport> nodeReports = listNode;
String node = nodeReports.get(0).getNodeId().getHost();
String[] nodes = new String[] {node};
AMRMClient.ContainerRequest storedContainer1 =
new AMRMClient.ContainerRequest(capability, nodes, null, priority);
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer1);
List<ResourceRequest> resourceAsk = new ArrayList<>();
for (ResourceRequest rr : amClient.ask) {
resourceAsk.add(rr);
}
ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
.newInstance(new ArrayList<>(), new ArrayList<>());
int responseId = 1;
return AllocateRequest.newInstance(responseId, 0, resourceAsk,
new ArrayList<>(), resourceBlacklistRequest);
}
protected ApplicationAttemptId createApp(YarnClient yarnClient,
MiniYARNCluster yarnCluster, Configuration conf) throws Exception {
ApplicationSubmissionContext appContext =
yarnClient.createApplication().getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setApplicationName("Test");
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(0);
appContext.setPriority(pri);
appContext.setQueue("default");
ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
Collections.<String, LocalResource> emptyMap(),
new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
new HashMap<String, ByteBuffer>(), null,
new HashMap<ApplicationAccessType, String>());
appContext.setAMContainerSpec(amContainer);
appContext.setResource(Resource.newInstance(1024, 1));
SubmitApplicationRequest appRequest =
Records.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
yarnClient.submitApplication(appContext);
RMAppAttempt appAttempt = null;
ApplicationAttemptId attemptId = null;
while (true) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport
.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
attemptId =
appReport.getCurrentApplicationAttemptId();
appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
while (true) {
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
break;
}
}
break;
}
}
Thread.sleep(1000);
// Just dig into the ResourceManager and get the AMRMToken just for the sake
// of testing.
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
// emulate RM setup of AMRM token in credentials by adding the token
// *before* setting the token service
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
appAttempt.getAMRMToken().setService(
ClientRMProxy.getAMRMTokenService(conf));
return attemptId;
}
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -413,11 +414,13 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer3);
// test addition and storage
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(2, containersRequestedAny);
containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(1, containersRequestedAny);
List<? extends Collection<ContainerRequest>> matches =
amClient.getMatchingRequests(priority, node, capability);
@ -919,12 +922,15 @@ public class TestAMRMClient {
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
.get(node).get(capability).remoteRequest.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
.get(rack).get(capability).remoteRequest.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);

View File

@ -26,6 +26,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -35,6 +37,46 @@ import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.junit.Test;
public class TestAMRMClientContainerRequest {
@Test
public void testOpportunisticAndGuaranteedRequests() {
AMRMClientImpl<ContainerRequest> client =
new AMRMClientImpl<ContainerRequest>();
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
client.init(conf);
Resource capability = Resource.newInstance(1024, 1);
ContainerRequest request =
new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1));
client.addContainerRequest(request);
verifyResourceRequest(client, request, "host1", true);
verifyResourceRequest(client, request, "host2", true);
verifyResourceRequest(client, request, "/rack1", true);
verifyResourceRequest(client, request, "/rack2", true);
verifyResourceRequest(client, request, ResourceRequest.ANY, true);
ContainerRequest request2 =
new ContainerRequest(capability, new String[] {"host1", "host2"},
new String[] {"/rack2"}, Priority.newInstance(1), true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
client.addContainerRequest(request2);
verifyResourceRequest(client, request, "host1", true,
ExecutionType.OPPORTUNISTIC);
verifyResourceRequest(client, request, "host2", true,
ExecutionType.OPPORTUNISTIC);
verifyResourceRequest(client, request, "/rack1", true,
ExecutionType.OPPORTUNISTIC);
verifyResourceRequest(client, request, "/rack2", true,
ExecutionType.OPPORTUNISTIC);
verifyResourceRequest(client, request, ResourceRequest.ANY, true,
ExecutionType.OPPORTUNISTIC);
}
@Test
public void testFillInRacks() {
AMRMClientImpl<ContainerRequest> client =
@ -224,8 +266,16 @@ public class TestAMRMClientContainerRequest {
private void verifyResourceRequest(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location, boolean expectedRelaxLocality) {
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
.get(location).get(request.getCapability()).remoteRequest;
verifyResourceRequest(client, request, location, expectedRelaxLocality,
ExecutionType.GUARANTEED);
}
private void verifyResourceRequest(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location, boolean expectedRelaxLocality,
ExecutionType executionType) {
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
location, executionType, request.getCapability()).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

View File

@ -19,20 +19,12 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -40,43 +32,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
public class TestAMRMProxy {
/**
* End-to-End test cases for the AMRMProxy Service.
*/
public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class);
@ -84,7 +58,7 @@ public class TestAMRMProxy {
* This test validates register, allocate and finish of an application through
* the AMRMPRoxy.
*/
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testAMRMProxyE2E() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
YarnClient rmClient = null;
@ -107,7 +81,8 @@ public class TestAMRMProxy {
// Submit application
ApplicationId appId = createApp(rmClient, cluster);
ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@ -173,7 +148,7 @@ public class TestAMRMProxy {
* that the received token it is different from the previous one within 5
* requests.
*/
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testE2ETokenRenewal() throws Exception {
MiniYARNCluster cluster =
new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
@ -201,7 +176,8 @@ public class TestAMRMProxy {
// Submit
ApplicationId appId = createApp(rmClient, cluster);
ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@ -252,7 +228,7 @@ public class TestAMRMProxy {
* This test validates that an AM cannot register directly to the RM, with the
* token provided by the AMRMProxy.
*/
@Test(timeout = 60000)
@Test(timeout = 120000)
public void testE2ETokenSwap() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
YarnClient rmClient = null;
@ -270,7 +246,8 @@ public class TestAMRMProxy {
rmClient.init(yarnConf);
rmClient.start();
ApplicationId appId = createApp(rmClient, cluster);
ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
ApplicationId appId = appAttmptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
@ -290,124 +267,4 @@ public class TestAMRMProxy {
cluster.stop();
}
}
protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
ApplicationId appId, MiniYARNCluster cluster,
final Configuration yarnConf)
throws IOException, InterruptedException, YarnException {
UserGroupInformation user = null;
// Get the AMRMToken from AMRMProxy
ApplicationReport report = rmClient.getApplicationReport(appId);
user = UserGroupInformation.createProxyUser(
report.getCurrentApplicationAttemptId().toString(),
UserGroupInformation.getCurrentUser());
ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
.getNodeManager(0).getNMContext().getContainerManager();
AMRMProxyTokenSecretManager amrmTokenSecretManager =
containerManager.getAMRMProxyService().getSecretManager();
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
amrmTokenSecretManager
.createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
SecurityUtil.setTokenService(token,
containerManager.getAMRMProxyService().getBindAddress());
user.addToken(token);
// Start Application Master
return user
.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() throws Exception {
return ClientRMProxy.createRMProxy(yarnConf,
ApplicationMasterProtocol.class);
}
});
}
protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
// The test needs AMRMClient to create a real allocate request
AMRMClientImpl<ContainerRequest> amClient =
new AMRMClientImpl<ContainerRequest>();
Resource capability = Resource.newInstance(1024, 2);
Priority priority = Priority.newInstance(1);
List<NodeReport> nodeReports = listNode;
String node = nodeReports.get(0).getNodeId().getHost();
String[] nodes = new String[] { node };
ContainerRequest storedContainer1 =
new ContainerRequest(capability, nodes, null, priority);
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer1);
List<ResourceRequest> resourceAsk = new ArrayList<ResourceRequest>();
for (ResourceRequest rr : amClient.ask) {
resourceAsk.add(rr);
}
ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
.newInstance(new ArrayList<String>(), new ArrayList<String>());
int responseId = 1;
return AllocateRequest.newInstance(responseId, 0, resourceAsk,
new ArrayList<ContainerId>(), resourceBlacklistRequest);
}
protected ApplicationId createApp(YarnClient yarnClient,
MiniYARNCluster yarnCluster) throws Exception {
ApplicationSubmissionContext appContext =
yarnClient.createApplication().getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setApplicationName("Test");
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(0);
appContext.setPriority(pri);
appContext.setQueue("default");
ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
Collections.<String, LocalResource> emptyMap(),
new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
new HashMap<String, ByteBuffer>(), null,
new HashMap<ApplicationAccessType, String>());
appContext.setAMContainerSpec(amContainer);
appContext.setResource(Resource.newInstance(1024, 1));
SubmitApplicationRequest appRequest =
Records.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
yarnClient.submitApplication(appContext);
RMAppAttempt appAttempt = null;
while (true) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport
.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
ApplicationAttemptId attemptId =
appReport.getCurrentApplicationAttemptId();
appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
while (true) {
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
break;
}
}
break;
}
}
Thread.sleep(1000);
return appId;
}
}

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -22,20 +22,31 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -43,12 +54,23 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Validates End2End Distributed Scheduling flow which includes the AM
@ -57,11 +79,70 @@ import java.util.List;
* the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingService running on the RM.
*/
public class TestDistributedScheduling extends TestAMRMProxy {
public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
private static final Log LOG =
LogFactory.getLog(TestDistributedScheduling.class);
protected MiniYARNCluster cluster;
protected YarnClient rmClient;
protected ApplicationMasterProtocol client;
protected Configuration conf;
protected Configuration yarnConf;
protected ApplicationAttemptId attemptId;
protected ApplicationId appId;
@Before
public void doBefore() throws Exception {
cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
cluster.init(conf);
cluster.start();
yarnConf = cluster.getConfig();
// the client has to connect to AMRMProxy
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
// Submit application
attemptId = createApp(rmClient, cluster, conf);
appId = attemptId.getApplicationId();
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
}
@After
public void doAfter() throws Exception {
if (client != null) {
try {
client.finishApplicationMaster(FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
rmClient.killApplication(attemptId.getApplicationId());
attemptId = null;
} catch (Exception e) {
}
}
if (rmClient != null) {
try {
rmClient.stop();
} catch (Exception e) {
}
}
if (cluster != null) {
try {
cluster.stop();
} catch (Exception e) {
}
}
}
/**
* Validates if Allocate Requests containing only OPPORTUNISTIC container
* requests are satisfied instantly.
@ -70,104 +151,63 @@ public class TestDistributedScheduling extends TestAMRMProxy {
*/
@Test(timeout = 60000)
public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
MiniYARNCluster cluster =
new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
YarnClient rmClient = null;
ApplicationMasterProtocol client;
LOG.info("testDistributedSchedulingE2E - Register");
try {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
// the client has to connect to AMRMProxy
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
RMApp rmApp =
cluster.getResourceManager().getRMContext().getRMApps().get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
// Submit application
LOG.info("testDistributedSchedulingE2E - Allocate");
ApplicationId appId = createApp(rmClient, cluster);
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
LOG.info("testDistributedSchedulingE2E - Register");
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
RMApp rmApp =
cluster.getResourceManager().getRMContext().getRMApps().get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
// Replace 'ANY' requests with OPPORTUNISTIC aks and remove
// everything else
List<ResourceRequest> newAskList = new ArrayList<>();
for (ResourceRequest rr : request.getAskList()) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
// Replace 'ANY' requests with OPPORTUNISTIC aks and remove
// everything else
List<ResourceRequest> newAskList = new ArrayList<>();
for (ResourceRequest rr : request.getAskList()) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
FinishApplicationMasterResponse responseFinish =
client.finishApplicationMaster(FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
Assert.assertNotNull(responseFinish);
} finally {
if (rmClient != null) {
rmClient.stop();
}
cluster.stop();
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
@ -178,135 +218,305 @@ public class TestDistributedScheduling extends TestAMRMProxy {
*/
@Test(timeout = 60000)
public void testMixedExecutionTypeRequestE2E() throws Exception {
MiniYARNCluster cluster =
new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
YarnClient rmClient = null;
ApplicationMasterProtocol client;
LOG.info("testDistributedSchedulingE2E - Register");
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
RMApp rmApp =
cluster.getResourceManager().getRMContext().getRMApps().get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
List<ResourceRequest> askList = request.getAskList();
List<ResourceRequest> newAskList = new ArrayList<>(askList);
// Duplicate all ANY requests marking them as opportunistic
for (ResourceRequest rr : askList) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
}
request.setAskList(new ArrayList<ResourceRequest>());
request.setResponseId(request.getResponseId() + 1);
Thread.sleep(1000);
// RM should allocate GUARANTEED containers within 2 calls to allocate()
allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are GUARANTEED
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.GUARANTEED,
containerTokenIdentifier.getExecutionType());
}
LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
* Validates if AMRMClient can be used with Distributed Scheduling turned on.
*
* @throws Exception
*/
@Test(timeout = 120000)
@SuppressWarnings("unchecked")
public void testAMRMClient() throws Exception {
AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
try {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
cluster.init(conf);
cluster.start();
final Configuration yarnConf = cluster.getConfig();
Priority priority = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
Resource capability = Resource.newInstance(1024, 1);
// the client has to connect to AMRMProxy
List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
String[] nodes = new String[]{node};
String[] racks = new String[]{rack};
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf);
rmClient.start();
// start am rm client
amClient = new AMRMClientImpl(client);
amClient.init(yarnConf);
amClient.start();
amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
// Submit application
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
ApplicationId appId = createApp(rmClient, cluster);
client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
LOG.info("testDistributedSchedulingE2E - Register");
RegisterApplicationMasterResponse responseRegister =
client.registerApplicationMaster(RegisterApplicationMasterRequest
.newInstance(NetUtils.getHostname(), 1024, ""));
Assert.assertNotNull(responseRegister);
Assert.assertNotNull(responseRegister.getQueue());
Assert.assertNotNull(responseRegister.getApplicationACLs());
Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
Assert
.assertNotNull(responseRegister.getContainersFromPreviousAttempts());
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
RMApp rmApp =
cluster.getResourceManager().getRMContext().getRMApps().get(appId);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
LOG.info("testDistributedSchedulingE2E - Allocate");
AllocateRequest request =
createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
List<ResourceRequest> askList = request.getAskList();
List<ResourceRequest> newAskList = new ArrayList<>(askList);
// Duplicate all ANY requests marking them as opportunistic
for (ResourceRequest rr : askList) {
if (ResourceRequest.ANY.equals(rr.getResourceName())) {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(),
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
assertEquals(2, containersRequestedAny);
assertEquals(1, oppContainersRequestedAny);
assertEquals(4, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int iterationsLeft = 10;
Set<ContainerId> releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<>();
while (allocatedContainerCount <
(containersRequestedAny + oppContainersRequestedAny)
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount += allocResponse.getAllocatedContainers()
.size();
for (Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
}
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
receivedNMTokens.put(nodeID, token.getToken());
}
if (allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
request.setAskList(newAskList);
AllocateResponse allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
assertEquals(allocatedContainerCount,
containersRequestedAny + oppContainersRequestedAny);
for (ContainerId rejectContainerId : releases) {
amClient.releaseAssignedContainer(rejectContainerId);
}
assertEquals(3, amClient.release.size());
assertEquals(0, amClient.ask.size());
// Ensure that all the requests are satisfied immediately
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// need to tell the AMRMClient that we dont need these resources anymore
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
assertEquals(4, amClient.ask.size());
// Verify that the allocated containers are OPPORTUNISTIC
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
containerTokenIdentifier.getExecutionType());
// test RPC exception handling
amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
nodes, racks, priority));
amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
final AMRMClient amc = amClient;
ApplicationMasterProtocol realRM = amClient.rmClient;
try {
ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
.class);
when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
new Answer<AllocateResponse>() {
public AllocateResponse answer(InvocationOnMock invocation)
throws Exception {
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes,
racks, priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks,
priority));
amc.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null,
priority2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
throw new Exception();
}
});
amClient.rmClient = mockRM;
amClient.allocate(0.1f);
} catch (Exception ioe) {
} finally {
amClient.rmClient = realRM;
}
request.setAskList(new ArrayList<ResourceRequest>());
request.setResponseId(request.getResponseId() + 1);
assertEquals(3, amClient.release.size());
assertEquals(6, amClient.ask.size());
Thread.sleep(1000);
// RM should allocate GUARANTEED containers within 2 calls to allocate()
allocResponse = client.allocate(request);
Assert.assertNotNull(allocResponse);
Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
// Verify that the allocated containers are GUARANTEED
for (Container allocatedContainer : allocResponse
.getAllocatedContainers()) {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(
allocatedContainer.getContainerToken());
Assert.assertEquals(ExecutionType.GUARANTEED,
containerTokenIdentifier.getExecutionType());
iterationsLeft = 3;
// do a few iterations to ensure RM is not going send new containers
while (iterationsLeft-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
assertEquals(0, allocResponse.getAllocatedContainers().size());
if (allocResponse.getCompletedContainersStatuses().size() > 0) {
for (ContainerStatus cStatus : allocResponse
.getCompletedContainersStatuses()) {
if (releases.contains(cStatus.getContainerId())) {
assertEquals(cStatus.getState(), ContainerState.COMPLETE);
assertEquals(-100, cStatus.getExitStatus());
releases.remove(cStatus.getContainerId());
}
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
LOG.info("testDistributedSchedulingE2E - Finish");
FinishApplicationMasterResponse responseFinish =
client.finishApplicationMaster(FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
Assert.assertNotNull(responseFinish);
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} finally {
if (rmClient != null) {
rmClient.stop();
if (amClient != null && amClient.getServiceState() == Service.STATE
.STARTED) {
amClient.stop();
}
cluster.stop();
}
}
@Ignore
@Override
public void testAMRMProxyE2E() throws Exception { }
@Ignore
@Override
public void testE2ETokenRenewal() throws Exception { }
@Ignore
@Override
public void testE2ETokenSwap() throws Exception { }
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
@ -251,9 +252,9 @@ public class TestNMClient {
racks, priority));
}
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest
.getNumContainers();
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;

View File

@ -214,7 +214,8 @@ public class ResourceRequestPBImpl extends ResourceRequest {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
+ ", Execution Spec: " + getExecutionTypeRequest() + "}";
+ ", Execution Type Request: " + getExecutionTypeRequest()
+ ", Node Label Expression: " + getNodeLabelExpression() + "}";
}
@Override