Merge r1488485 from trunk for YARN-660. Improve AMRMClient with matching requests (bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1488487 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-06-01 08:31:50 +00:00
parent 4e0ac70cea
commit 395afc7b94
8 changed files with 737 additions and 141 deletions

View File

@ -219,6 +219,8 @@ Release 2.0.5-beta - UNRELEASED
YARN-638. Modified ResourceManager to restore RMDelegationTokens after YARN-638. Modified ResourceManager to restore RMDelegationTokens after
restarting. (Jian He via vinodkv) restarting. (Jian He via vinodkv)
YARN-660. Improve AMRMClient with matching requests (bikas)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -151,7 +151,7 @@ public class ApplicationMaster {
private YarnRPC rpc; private YarnRPC rpc;
// Handle to communicate with the Resource Manager // Handle to communicate with the Resource Manager
private AMRMClientAsync resourceManager; private AMRMClientAsync<ContainerRequest> resourceManager;
// Application Attempt Id ( combination of attemptId and fail count ) // Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID; private ApplicationAttemptId appAttemptID;
@ -442,7 +442,9 @@ public boolean run() throws YarnRemoteException, IOException {
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener); resourceManager = new AMRMClientAsync<ContainerRequest>(appAttemptID,
1000,
allocListener);
resourceManager.init(conf); resourceManager.init(conf);
resourceManager.start(); resourceManager.start();
@ -522,7 +524,8 @@ private void finish() {
FinalApplicationStatus appStatus; FinalApplicationStatus appStatus;
String appMessage = null; String appMessage = null;
success = true; success = true;
if (numFailedContainers.get() == 0) { if (numFailedContainers.get() == 0 &&
numCompletedContainers.get() == numTotalContainers) {
appStatus = FinalApplicationStatus.SUCCEEDED; appStatus = FinalApplicationStatus.SUCCEEDED;
} else { } else {
appStatus = FinalApplicationStatus.FAILED; appStatus = FinalApplicationStatus.FAILED;
@ -594,11 +597,6 @@ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
resourceManager.addContainerRequest(containerAsk); resourceManager.addContainerRequest(containerAsk);
} }
// set progress to deliver to RM on next heartbeat
float progress = (float) numCompletedContainers.get()
/ numTotalContainers;
resourceManager.setProgress(progress);
if (numCompletedContainers.get() == numTotalContainers) { if (numCompletedContainers.get() == numTotalContainers) {
done = true; done = true;
} }
@ -637,6 +635,19 @@ public void onRebootRequest() {}
@Override @Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {} public void onNodesUpdated(List<NodeReport> updatedNodes) {}
@Override
public float getProgress() {
// set progress to deliver to RM on next heartbeat
float progress = (float) numCompletedContainers.get()
/ numTotalContainers;
return progress;
}
@Override
public void onError(Exception e) {
done = true;
}
} }
/** /**

View File

@ -18,8 +18,9 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -32,9 +33,11 @@
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import com.google.common.collect.ImmutableList;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface AMRMClient extends Service { public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
/** /**
* Object to represent container request for resources. * Object to represent container request for resources.
@ -43,20 +46,41 @@ public interface AMRMClient extends Service {
* Can ask for multiple containers of a given type. * Can ask for multiple containers of a given type.
*/ */
public static class ContainerRequest { public static class ContainerRequest {
Resource capability; final Resource capability;
String[] hosts; final ImmutableList<String> hosts;
String[] racks; final ImmutableList<String> racks;
Priority priority; final Priority priority;
int containerCount; final int containerCount;
public ContainerRequest(Resource capability, String[] hosts, public ContainerRequest(Resource capability, String[] hosts,
String[] racks, Priority priority, int containerCount) { String[] racks, Priority priority, int containerCount) {
this.capability = capability; this.capability = capability;
this.hosts = (hosts != null ? hosts.clone() : null); this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
this.racks = (racks != null ? racks.clone() : null); this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
this.priority = priority; this.priority = priority;
this.containerCount = containerCount; this.containerCount = containerCount;
} }
public Resource getCapability() {
return capability;
}
public ImmutableList<String> getHosts() {
return hosts;
}
public ImmutableList<String> getRacks() {
return racks;
}
public Priority getPriority() {
return priority;
}
public int getContainerCount() {
return containerCount;
}
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]"); sb.append("Capability[").append(capability).append("]");
@ -65,6 +89,22 @@ public String toString() {
return sb.toString(); return sb.toString();
} }
} }
/**
* This creates a <code>ContainerRequest</code> for 1 container and the
* AMRMClient stores this request internally. <code>getMatchingRequests</code>
* can be used to retrieve these requests from AMRMClient. These requests may
* be matched with an allocated container to determine which request to assign
* the container to. <code>removeContainerRequest</code> must be called using
* the same assigned <code>StoredContainerRequest</code> object so that
* AMRMClient can remove it from its internal store.
*/
public static class StoredContainerRequest extends ContainerRequest {
public StoredContainerRequest(Resource capability, String[] hosts,
String[] racks, Priority priority) {
super(capability, hosts, racks, priority, 1);
}
}
/** /**
* Register the application master. This must be called before any * Register the application master. This must be called before any
@ -117,7 +157,7 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
* Request containers for resources before calling <code>allocate</code> * Request containers for resources before calling <code>allocate</code>
* @param req Resource request * @param req Resource request
*/ */
public void addContainerRequest(ContainerRequest req); public void addContainerRequest(T req);
/** /**
* Remove previous container request. The previous container request may have * Remove previous container request. The previous container request may have
@ -126,7 +166,7 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
* even after the remove request * even after the remove request
* @param req Resource request * @param req Resource request
*/ */
public void removeContainerRequest(ContainerRequest req); public void removeContainerRequest(T req);
/** /**
* Release containers assigned by the Resource Manager. If the app cannot use * Release containers assigned by the Resource Manager. If the app cannot use
@ -150,4 +190,21 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
* @return Current number of nodes in the cluster * @return Current number of nodes in the cluster
*/ */
public int getClusterNodeCount(); public int getClusterNodeCount();
/**
* Get outstanding <code>StoredContainerRequest</code>s matching the given
* parameters. These StoredContainerRequests should have been added via
* <code>addContainerRequest</code> earlier in the lifecycle. For performance,
* the AMRMClient may return its internal collection directly without creating
* a copy. Users should not perform mutable operations on the return value.
* Each collection in the list contains requests with identical
* <code>Resource</code> size that fit in the given capability. In a
* collection, requests will be returned in the same order as they were added.
* @return Collection of request matching the parameters
*/
public List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability);
} }

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,7 +40,9 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
@ -88,55 +92,50 @@
*/ */
@Unstable @Unstable
@Evolving @Evolving
public class AMRMClientAsync extends AbstractService { public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
private final AMRMClient client; private final AMRMClient<T> client;
private final int intervalMs; private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
private final HeartbeatThread heartbeatThread; private final HeartbeatThread heartbeatThread;
private final CallbackHandlerThread handlerThread; private final CallbackHandlerThread handlerThread;
private final CallbackHandler handler; private final CallbackHandler handler;
private final BlockingQueue<AllocateResponse> responseQueue; private final BlockingQueue<AllocateResponse> responseQueue;
private final Object unregisterHeartbeatLock = new Object();
private volatile boolean keepRunning; private volatile boolean keepRunning;
private volatile float progress; private volatile float progress;
private volatile Exception savedException;
public AMRMClientAsync(ApplicationAttemptId id, int intervalMs, public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
CallbackHandler callbackHandler) { CallbackHandler callbackHandler) {
this(new AMRMClientImpl(id), intervalMs, callbackHandler); this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
} }
@Private @Private
@VisibleForTesting @VisibleForTesting
AMRMClientAsync(AMRMClient client, int intervalMs, public AMRMClientAsync(AMRMClient<T> client, int intervalMs,
CallbackHandler callbackHandler) { CallbackHandler callbackHandler) {
super(AMRMClientAsync.class.getName()); super(AMRMClientAsync.class.getName());
this.client = client; this.client = client;
this.intervalMs = intervalMs; this.heartbeatIntervalMs.set(intervalMs);
handler = callbackHandler; handler = callbackHandler;
heartbeatThread = new HeartbeatThread(); heartbeatThread = new HeartbeatThread();
handlerThread = new CallbackHandlerThread(); handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<AllocateResponse>(); responseQueue = new LinkedBlockingQueue<AllocateResponse>();
keepRunning = true; keepRunning = true;
savedException = null;
} }
/**
* Sets the application's current progress. It will be transmitted to the
* resource manager on the next heartbeat.
* @param progress
* the application's progress so far
*/
public void setProgress(float progress) {
this.progress = progress;
}
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
super.init(conf); super.init(conf);
client.init(conf); client.init(conf);
} }
@Override @Override
public void start() { public void start() {
@ -171,6 +170,17 @@ public void stop() {
super.stop(); super.stop();
} }
public void setHeartbeatInterval(int interval) {
heartbeatIntervalMs.set(interval);
}
public List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability) {
return client.getMatchingRequests(priority, resourceName, capability);
}
/** /**
* Registers this application master with the resource manager. On successful * Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread. * registration, starts the heartbeating thread.
@ -180,8 +190,8 @@ public void stop() {
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl) String appHostName, int appHostPort, String appTrackingUrl)
throws YarnRemoteException, IOException { throws YarnRemoteException, IOException {
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response = client
client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
heartbeatThread.start(); heartbeatThread.start();
return response; return response;
} }
@ -195,8 +205,9 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
* @throws IOException * @throws IOException
*/ */
public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException { String appMessage, String appTrackingUrl) throws YarnRemoteException,
synchronized (client) { IOException {
synchronized (unregisterHeartbeatLock) {
keepRunning = false; keepRunning = false;
client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl); client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
} }
@ -206,7 +217,7 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
* Request containers for resources before calling <code>allocate</code> * Request containers for resources before calling <code>allocate</code>
* @param req Resource request * @param req Resource request
*/ */
public void addContainerRequest(AMRMClient.ContainerRequest req) { public void addContainerRequest(T req) {
client.addContainerRequest(req); client.addContainerRequest(req);
} }
@ -217,7 +228,7 @@ public void addContainerRequest(AMRMClient.ContainerRequest req) {
* even after the remove request * even after the remove request
* @param req Resource request * @param req Resource request
*/ */
public void removeContainerRequest(AMRMClient.ContainerRequest req) { public void removeContainerRequest(T req) {
client.removeContainerRequest(req); client.removeContainerRequest(req);
} }
@ -259,7 +270,7 @@ public void run() {
while (true) { while (true) {
AllocateResponse response = null; AllocateResponse response = null;
// synchronization ensures we don't send heartbeats after unregistering // synchronization ensures we don't send heartbeats after unregistering
synchronized (client) { synchronized (unregisterHeartbeatLock) {
if (!keepRunning) { if (!keepRunning) {
break; break;
} }
@ -267,9 +278,17 @@ public void run() {
try { try {
response = client.allocate(progress); response = client.allocate(progress);
} catch (YarnRemoteException ex) { } catch (YarnRemoteException ex) {
LOG.error("Failed to heartbeat", ex); LOG.error("Yarn exception on heartbeat", ex);
savedException = ex;
// interrupt handler thread in case it waiting on the queue
handlerThread.interrupt();
break;
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to heartbeat", e); LOG.error("IO exception on heartbeat", e);
savedException = e;
// interrupt handler thread in case it waiting on the queue
handlerThread.interrupt();
break;
} }
} }
if (response != null) { if (response != null) {
@ -278,15 +297,15 @@ public void run() {
responseQueue.put(response); responseQueue.put(response);
break; break;
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.warn("Interrupted while waiting to put on response queue", ex); LOG.info("Interrupted while waiting to put on response queue", ex);
} }
} }
} }
try { try {
Thread.sleep(intervalMs); Thread.sleep(heartbeatIntervalMs.get());
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.warn("Heartbeater interrupted", ex); LOG.info("Heartbeater interrupted", ex);
} }
} }
} }
@ -301,14 +320,21 @@ public void run() {
while (keepRunning) { while (keepRunning) {
AllocateResponse response; AllocateResponse response;
try { try {
if(savedException != null) {
LOG.error("Stopping callback due to: ", savedException);
handler.onError(savedException);
break;
}
response = responseQueue.take(); response = responseQueue.take();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.info("Interrupted while waiting for queue"); LOG.info("Interrupted while waiting for queue", ex);
continue; continue;
} }
if (response.getReboot()) { if (response.getReboot()) {
handler.onRebootRequest(); handler.onRebootRequest();
LOG.info("Reboot requested. Stopping callback.");
break;
} }
List<NodeReport> updatedNodes = response.getUpdatedNodes(); List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) { if (!updatedNodes.isEmpty()) {
@ -325,6 +351,8 @@ public void run() {
if (!allocated.isEmpty()) { if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated); handler.onContainersAllocated(allocated);
} }
progress = handler.getProgress();
} }
} }
} }
@ -347,14 +375,19 @@ public interface CallbackHandler {
/** /**
* Called when the ResourceManager wants the ApplicationMaster to reboot * Called when the ResourceManager wants the ApplicationMaster to reboot
* for being out of sync. * for being out of sync. The ApplicationMaster should not unregister with
* the RM unless the ApplicationMaster wants to be the last attempt.
*/ */
public void onRebootRequest(); public void onRebootRequest();
/** /**
* Called when nodes tracked by the ResourceManager have changed in in health, * Called when nodes tracked by the ResourceManager have changed in health,
* availability etc. * availability etc.
*/ */
public void onNodesUpdated(List<NodeReport> updatedNodes); public void onNodesUpdated(List<NodeReport> updatedNodes);
public float getProgress();
public void onError(Exception e);
} }
} }

View File

@ -22,9 +22,15 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
@ -47,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -55,8 +62,11 @@
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
// TODO check inputs for null etc. YARN-654
@Unstable @Unstable
public class AMRMClientImpl extends AbstractService implements AMRMClient { public class AMRMClientImpl<T extends ContainerRequest>
extends AbstractService implements AMRMClient<T> {
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
@ -70,6 +80,57 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
protected Resource clusterAvailableResources; protected Resource clusterAvailableResources;
protected int clusterNodeCount; protected int clusterNodeCount;
class ResourceRequestInfo {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
ResourceRequestInfo(Priority priority, String resourceName,
Resource capability) {
remoteRequest = BuilderUtils.newResourceRequest(priority, resourceName,
capability, 0);
containerRequests = new LinkedHashSet<T>();
}
}
/**
* Class compares Resource by memory then cpu in reverse order
*/
class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
@Override
public int compare(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
if(mem0 == mem1) {
if(cpu0 == cpu1) {
return 0;
}
if(cpu0 < cpu1) {
return 1;
}
return -1;
}
if(mem0 < mem1) {
return 1;
}
return -1;
}
}
static boolean canFit(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
if(mem0 <= mem1 && cpu0 <= cpu1) {
return true;
}
return false;
}
//Key -> Priority //Key -> Priority
//Value -> Map //Value -> Map
//Key->ResourceName (e.g., hostname, rackname, *) //Key->ResourceName (e.g., hostname, rackname, *)
@ -77,9 +138,9 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
//Key->Resource Capability //Key->Resource Capability
//Value->ResourceRequest //Value->ResourceRequest
protected final protected final
Map<Priority, Map<String, Map<Resource, ResourceRequest>>> Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
remoteRequestsTable = remoteRequestsTable =
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>(); new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator()); new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
@ -223,42 +284,47 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
} }
@Override @Override
public synchronized void addContainerRequest(ContainerRequest req) { public synchronized void addContainerRequest(T req) {
// Create resource requests // Create resource requests
if(req.hosts != null) { // add check for dup locations
if (req.hosts != null) {
for (String host : req.hosts) { for (String host : req.hosts) {
addResourceRequest(req.priority, host, req.capability, req.containerCount); addResourceRequest(req.priority, host, req.capability,
req.containerCount, req);
} }
} }
if(req.racks != null) { if (req.racks != null) {
for (String rack : req.racks) { for (String rack : req.racks) {
addResourceRequest(req.priority, rack, req.capability, req.containerCount); addResourceRequest(req.priority, rack, req.capability,
req.containerCount, req);
} }
} }
// Off-switch // Off-switch
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount); req.containerCount, req);
} }
@Override @Override
public synchronized void removeContainerRequest(ContainerRequest req) { public synchronized void removeContainerRequest(T req) {
// Update resource requests // Update resource requests
if(req.hosts != null) { if (req.hosts != null) {
for (String hostName : req.hosts) { for (String hostName : req.hosts) {
decResourceRequest(req.priority, hostName, req.capability, req.containerCount); decResourceRequest(req.priority, hostName, req.capability,
req.containerCount, req);
} }
} }
if(req.racks != null) { if (req.racks != null) {
for (String rack : req.racks) { for (String rack : req.racks) {
decResourceRequest(req.priority, rack, req.capability, req.containerCount); decResourceRequest(req.priority, rack, req.capability,
req.containerCount, req);
} }
} }
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount); req.containerCount, req);
} }
@Override @Override
@ -276,6 +342,44 @@ public synchronized int getClusterNodeCount() {
return clusterNodeCount; return clusterNodeCount;
} }
@Override
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority,
String resourceName,
Resource capability) {
List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
return list;
}
TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
.get(resourceName);
if (reqMap == null) {
return list;
}
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (resourceRequestInfo != null) {
list.add(resourceRequestInfo.containerRequests);
return list;
}
// no exact match. Container may be larger than what was requested.
// get all resources <= capability. map is reverse sorted.
SortedMap<Resource, ResourceRequestInfo> tailMap =
reqMap.tailMap(capability);
for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
if(canFit(entry.getKey(), capability)) {
// match found that fits in the larger resource
list.add(entry.getValue().containerRequests);
}
}
// no match found
return list;
}
private void addResourceRequestToAsk(ResourceRequest remoteRequest) { private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario. // This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container // A ResourceRequest is removed from the remoteRequestTable. A 0 container
@ -294,44 +398,57 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
} }
private void addResourceRequest(Priority priority, String resourceName, private void addResourceRequest(Priority priority, String resourceName,
Resource capability, int containerCount) { Resource capability, int containerCount, T req) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests = Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority); this.remoteRequestsTable.get(priority);
if (remoteRequests == null) { if (remoteRequests == null) {
remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>(); remoteRequests =
new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
this.remoteRequestsTable.put(priority, remoteRequests); this.remoteRequestsTable.put(priority, remoteRequests);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Added priority=" + priority); LOG.debug("Added priority=" + priority);
} }
} }
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName); TreeMap<Resource, ResourceRequestInfo> reqMap =
remoteRequests.get(resourceName);
if (reqMap == null) { if (reqMap == null) {
reqMap = new HashMap<Resource, ResourceRequest>(); // capabilities are stored in reverse sorted order. smallest last.
reqMap = new TreeMap<Resource, ResourceRequestInfo>(
new ResourceReverseMemoryThenCpuComparator());
remoteRequests.put(resourceName, reqMap); remoteRequests.put(resourceName, reqMap);
} }
ResourceRequest remoteRequest = reqMap.get(capability); ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (remoteRequest == null) { if (resourceRequestInfo == null) {
remoteRequest = BuilderUtils. resourceRequestInfo =
newResourceRequest(priority, resourceName, capability, 0); new ResourceRequestInfo(priority, resourceName, capability);
reqMap.put(capability, remoteRequest); reqMap.put(capability, resourceRequestInfo);
} }
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount); resourceRequestInfo.remoteRequest.setNumContainers(
resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
if(req instanceof StoredContainerRequest) {
resourceRequestInfo.containerRequests.add(req);
}
// Note this down for next interaction with ResourceManager // Note this down for next interaction with ResourceManager
addResourceRequestToAsk(remoteRequest); addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("addResourceRequest:" + " applicationId=" LOG.debug("addResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority() + appAttemptId + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers=" + " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size()); + resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
} }
} }
private void decResourceRequest(Priority priority, String resourceName, private void decResourceRequest(Priority priority,
Resource capability, int containerCount) { String resourceName,
Map<String, Map<Resource, ResourceRequest>> remoteRequests = Resource capability,
int containerCount,
T req) {
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
this.remoteRequestsTable.get(priority); this.remoteRequestsTable.get(priority);
if(remoteRequests == null) { if(remoteRequests == null) {
@ -342,7 +459,7 @@ private void decResourceRequest(Priority priority, String resourceName,
return; return;
} }
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName); Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) { if (reqMap == null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as " + resourceName LOG.debug("Not decrementing resource as " + resourceName
@ -350,28 +467,34 @@ private void decResourceRequest(Priority priority, String resourceName,
} }
return; return;
} }
ResourceRequest remoteRequest = reqMap.get(capability); ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("BEFORE decResourceRequest:" + " applicationId=" LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority() + appAttemptId + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers=" + " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size()); + resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
} }
remoteRequest. resourceRequestInfo.remoteRequest.setNumContainers(
setNumContainers(remoteRequest.getNumContainers() - containerCount); resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
if(remoteRequest.getNumContainers() < 0) {
if(req instanceof StoredContainerRequest) {
resourceRequestInfo.containerRequests.remove(req);
}
if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
// guard against spurious removals // guard against spurious removals
remoteRequest.setNumContainers(0); resourceRequestInfo.remoteRequest.setNumContainers(0);
} }
// send the ResourceRequest to RM even if is 0 because it needs to override // send the ResourceRequest to RM even if is 0 because it needs to override
// a previously sent value. If ResourceRequest was not sent previously then // a previously sent value. If ResourceRequest was not sent previously then
// sending 0 aught to be a no-op on RM // sending 0 aught to be a no-op on RM
addResourceRequestToAsk(remoteRequest); addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
// delete entries from map if no longer needed // delete entries from map if no longer needed
if (remoteRequest.getNumContainers() == 0) { if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
reqMap.remove(capability); reqMap.remove(capability);
if (reqMap.size() == 0) { if (reqMap.size() == 0) {
remoteRequests.remove(resourceName); remoteRequests.remove(resourceName);
@ -385,7 +508,8 @@ private void decResourceRequest(Priority priority, String resourceName,
LOG.info("AFTER decResourceRequest:" + " applicationId=" LOG.info("AFTER decResourceRequest:" + " applicationId="
+ appAttemptId + " priority=" + priority.getPriority() + appAttemptId + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers=" + " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size()); + resourceRequestInfo.remoteRequest.getNumContainers()
+ " #asks=" + ask.size());
} }
} }

View File

@ -24,6 +24,7 @@
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -50,27 +51,38 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.Service.STATE; import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
public class TestAMRMClient { public class TestAMRMClient {
Configuration conf = null; static Configuration conf = null;
MiniYARNCluster yarnCluster = null; static MiniYARNCluster yarnCluster = null;
YarnClientImpl yarnClient = null; static YarnClientImpl yarnClient = null;
List<NodeReport> nodeReports = null; static List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null; static ApplicationAttemptId attemptId = null;
int nodeCount = 3; static int nodeCount = 3;
@Before static Resource capability;
public void setup() throws YarnRemoteException, IOException { static Priority priority;
static String node;
static String rack;
static String[] nodes;
static String[] racks;
@BeforeClass
public static void setup() throws Exception {
// start minicluster // start minicluster
conf = new YarnConfiguration(); conf = new YarnConfiguration();
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
@ -84,7 +96,17 @@ public void setup() throws YarnRemoteException, IOException {
// get node info // get node info
nodeReports = yarnClient.getNodeReports(); nodeReports = yarnClient.getNodeReports();
priority = BuilderUtils.newPriority(1);
capability = BuilderUtils.newResource(1024, 1);
node = nodeReports.get(0).getNodeId().getHost();
rack = nodeReports.get(0).getRackName();
nodes = new String[]{ node };
racks = new String[]{ rack };
}
@Before
public void startApp() throws Exception {
// submit new app // submit new app
GetNewApplicationResponse newApp = yarnClient.getNewApplication(); GetNewApplicationResponse newApp = yarnClient.getNewApplication();
ApplicationId appId = newApp.getApplicationId(); ApplicationId appId = newApp.getApplicationId();
@ -125,7 +147,12 @@ public void setup() throws YarnRemoteException, IOException {
} }
@After @After
public void tearDown() { public void cancelApp() {
attemptId = null;
}
@AfterClass
public static void tearDown() {
if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
yarnClient.stop(); yarnClient.stop();
} }
@ -133,13 +160,235 @@ public void tearDown() {
yarnCluster.stop(); yarnCluster.stop();
} }
} }
@Test (timeout=60000)
public void testAMRMClientMatchingFit() throws YarnRemoteException, IOException {
AMRMClientImpl<StoredContainerRequest> amClient = null;
try {
// start am rm client
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
Resource capability1 = BuilderUtils.newResource(1024, 2);
Resource capability2 = BuilderUtils.newResource(1024, 1);
Resource capability3 = BuilderUtils.newResource(1000, 2);
Resource capability4 = BuilderUtils.newResource(2000, 1);
Resource capability5 = BuilderUtils.newResource(1000, 3);
Resource capability6 = BuilderUtils.newResource(2000, 1);
StoredContainerRequest storedContainer1 =
new StoredContainerRequest(capability1, nodes, racks, priority);
StoredContainerRequest storedContainer2 =
new StoredContainerRequest(capability2, nodes, racks, priority);
StoredContainerRequest storedContainer3 =
new StoredContainerRequest(capability3, nodes, racks, priority);
StoredContainerRequest storedContainer4 =
new StoredContainerRequest(capability4, nodes, racks, priority);
StoredContainerRequest storedContainer5 =
new StoredContainerRequest(capability5, nodes, racks, priority);
StoredContainerRequest storedContainer6 =
new StoredContainerRequest(capability6, nodes, racks, priority);
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3);
amClient.addContainerRequest(storedContainer4);
amClient.addContainerRequest(storedContainer5);
amClient.addContainerRequest(storedContainer6);
// test matching of containers
List<? extends Collection<StoredContainerRequest>> matches;
StoredContainerRequest storedRequest;
// exact match
Resource testCapability1 = BuilderUtils.newResource(1024, 2);
matches = amClient.getMatchingRequests(priority, node, testCapability1);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
amClient.removeContainerRequest(storedContainer1);
// exact matching with order maintained
Resource testCapability2 = BuilderUtils.newResource(2000, 1);
matches = amClient.getMatchingRequests(priority, node, testCapability2);
verifyMatches(matches, 2);
// must be returned in the order they were made
int i = 0;
for(StoredContainerRequest storedRequest1 : matches.get(0)) {
if(i++ == 0) {
assertTrue(storedContainer4 == storedRequest1);
} else {
assertTrue(storedContainer6 == storedRequest1);
}
}
amClient.removeContainerRequest(storedContainer6);
// matching with larger container. all requests returned
Resource testCapability3 = BuilderUtils.newResource(4000, 4);
matches = amClient.getMatchingRequests(priority, node, testCapability3);
assert(matches.size() == 4);
Resource testCapability4 = BuilderUtils.newResource(1024, 2);
matches = amClient.getMatchingRequests(priority, node, testCapability4);
assert(matches.size() == 2);
// verify non-fitting containers are not returned and fitting ones are
for(Collection<StoredContainerRequest> testSet : matches) {
assertTrue(testSet.size() == 1);
StoredContainerRequest testRequest = testSet.iterator().next();
assertTrue(testRequest != storedContainer4);
assertTrue(testRequest != storedContainer5);
assert(testRequest == storedContainer2 ||
testRequest == storedContainer3);
}
Resource testCapability5 = BuilderUtils.newResource(512, 4);
matches = amClient.getMatchingRequests(priority, node, testCapability5);
assert(matches.size() == 0);
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
private void verifyMatches(
List<? extends Collection<StoredContainerRequest>> matches,
int matchSize) {
assertTrue(matches.size() == 1);
assertTrue(matches.get(0).size() == matchSize);
}
@Test (timeout=60000)
public void testAMRMClientMatchStorage() throws YarnRemoteException, IOException {
AMRMClientImpl<StoredContainerRequest> amClient = null;
try {
// start am rm client
amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
Priority priority1 = Records.newRecord(Priority.class);
priority1.setPriority(2);
StoredContainerRequest storedContainer1 =
new StoredContainerRequest(capability, nodes, racks, priority);
StoredContainerRequest storedContainer2 =
new StoredContainerRequest(capability, nodes, racks, priority);
StoredContainerRequest storedContainer3 =
new StoredContainerRequest(capability, null, null, priority1);
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3);
// test addition and storage
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedAny == 2);
containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedAny == 1);
List<? extends Collection<StoredContainerRequest>> matches =
amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 2);
matches = amClient.getMatchingRequests(priority, rack, capability);
verifyMatches(matches, 2);
matches =
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
verifyMatches(matches, 2);
matches = amClient.getMatchingRequests(priority1, rack, capability);
assertTrue(matches.isEmpty());
matches =
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
verifyMatches(matches, 1);
// test removal
amClient.removeContainerRequest(storedContainer3);
matches = amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 2);
amClient.removeContainerRequest(storedContainer2);
matches = amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 1);
matches = amClient.getMatchingRequests(priority, rack, capability);
verifyMatches(matches, 1);
// test matching of containers
StoredContainerRequest storedRequest = matches.get(0).iterator().next();
assertTrue(storedContainer1 == storedRequest);
amClient.removeContainerRequest(storedContainer1);
matches =
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
assertTrue(matches.isEmpty());
matches =
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
assertTrue(matches.isEmpty());
// 0 requests left. everything got cleaned up
assertTrue(amClient.remoteRequestsTable.isEmpty());
// go through an exemplary allocation, matching and release cycle
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer3);
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int iterationsLeft = 2;
while (allocatedContainerCount < 2
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertTrue(nodeCount == amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerRequest expectedRequest =
container.getPriority().equals(storedContainer1.getPriority()) ?
storedContainer1 : storedContainer3;
matches = amClient.getMatchingRequests(container.getPriority(),
ResourceRequest.ANY,
container.getResource());
// test correct matched container is returned
verifyMatches(matches, 1);
ContainerRequest matchedRequest = matches.get(0).iterator().next();
assertTrue(matchedRequest == expectedRequest);
// assign this container, use it and release it
amClient.releaseAssignedContainer(container.getId());
}
if(allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(1000);
}
}
assertTrue(allocatedContainerCount == 2);
assertTrue(amClient.release.size() == 2);
assertTrue(amClient.ask.size() == 0);
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.release.size() == 0);
assertTrue(amClient.ask.size() == 0);
assertTrue(allocResponse.getAllocatedContainers().size() == 0);
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
@Test (timeout=60000) @Test (timeout=60000)
public void testAMRMClient() throws YarnRemoteException, IOException { public void testAMRMClient() throws YarnRemoteException, IOException {
AMRMClientImpl amClient = null; AMRMClientImpl<ContainerRequest> amClient = null;
try { try {
// start am rm client // start am rm client
amClient = new AMRMClientImpl(attemptId); amClient = new AMRMClientImpl<ContainerRequest>(attemptId);
amClient.init(conf); amClient.init(conf);
amClient.start(); amClient.start();
@ -156,36 +405,27 @@ public void testAMRMClient() throws YarnRemoteException, IOException {
} }
} }
} }
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
private void testAllocation(final AMRMClientImpl amClient)
throws YarnRemoteException, IOException { throws YarnRemoteException, IOException {
// setup container request // setup container request
final Resource capability = Records.newRecord(Resource.class);
final Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
capability.setMemory(1024);
String node = nodeReports.get(0).getNodeId().getHost();
String rack = nodeReports.get(0).getRackName();
final String[] nodes = { node };
final String[] racks = { rack };
assertTrue(amClient.ask.size() == 0); assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0); assertTrue(amClient.release.size() == 0);
amClient.addContainerRequest(new ContainerRequest(capability, nodes, amClient.addContainerRequest(
racks, priority, 1)); new ContainerRequest(capability, nodes, racks, priority, 1));
amClient.addContainerRequest(new ContainerRequest(capability, nodes, amClient.addContainerRequest(
racks, priority, 3)); new ContainerRequest(capability, nodes, racks, priority, 3));
amClient.removeContainerRequest(new ContainerRequest(capability, nodes, amClient.removeContainerRequest(
racks, priority, 2)); new ContainerRequest(capability, nodes, racks, priority, 2));
int containersRequestedNode = amClient.remoteRequestsTable.get(priority) int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
.get(node).get(capability).getNumContainers(); .get(node).get(capability).remoteRequest.getNumContainers();
int containersRequestedRack = amClient.remoteRequestsTable.get(priority) int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
.get(rack).get(capability).getNumContainers(); .get(rack).get(capability).remoteRequest.getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority) int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).getNumContainers(); .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
assertTrue(containersRequestedNode == 2); assertTrue(containersRequestedNode == 2);
assertTrue(containersRequestedRack == 2); assertTrue(containersRequestedRack == 2);
@ -221,8 +461,8 @@ private void testAllocation(final AMRMClientImpl amClient)
assertTrue(amClient.ask.size() == 0); assertTrue(amClient.ask.size() == 0);
// need to tell the AMRMClient that we dont need these resources anymore // need to tell the AMRMClient that we dont need these resources anymore
amClient.removeContainerRequest(new ContainerRequest(capability, nodes, amClient.removeContainerRequest(
racks, priority, 2)); new ContainerRequest(capability, nodes, racks, priority, 2));
assertTrue(amClient.ask.size() == 3); assertTrue(amClient.ask.size() == 3);
// send 0 container count request for resources that are no longer needed // send 0 container count request for resources that are no longer needed
ResourceRequest snoopRequest = amClient.ask.iterator().next(); ResourceRequest snoopRequest = amClient.ask.iterator().next();
@ -241,8 +481,9 @@ private void testAllocation(final AMRMClientImpl amClient)
new Answer<AllocateResponse>() { new Answer<AllocateResponse>() {
public AllocateResponse answer(InvocationOnMock invocation) public AllocateResponse answer(InvocationOnMock invocation)
throws Exception { throws Exception {
amClient.removeContainerRequest(new ContainerRequest(capability, amClient.removeContainerRequest(
nodes, racks, priority, 2)); new ContainerRequest(capability, nodes,
racks, priority, 2));
throw new Exception(); throw new Exception();
} }
}); });

View File

@ -28,6 +28,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert; import junit.framework.Assert;
@ -39,6 +40,9 @@
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -48,9 +52,11 @@ public class TestAMRMClientAsync {
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class); private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
@SuppressWarnings("unchecked")
@Test(timeout=10000) @Test(timeout=10000)
public void testAMRMClientAsync() throws Exception { public void testAMRMClientAsync() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
final AtomicBoolean heartbeatBlock = new AtomicBoolean(true);
List<ContainerStatus> completed1 = Arrays.asList( List<ContainerStatus> completed1 = Arrays.asList(
BuilderUtils.newContainerStatus( BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(0, 0, 0, 0), BuilderUtils.newContainerId(0, 0, 0, 0),
@ -65,20 +71,38 @@ public void testAMRMClientAsync() throws Exception {
new ArrayList<ContainerStatus>(), new ArrayList<Container>()); new ArrayList<ContainerStatus>(), new ArrayList<Container>());
TestCallbackHandler callbackHandler = new TestCallbackHandler(); TestCallbackHandler callbackHandler = new TestCallbackHandler();
AMRMClient client = mock(AMRMClient.class); final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false); final AtomicInteger secondHeartbeatSync = new AtomicInteger(0);
when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() { when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
@Override @Override
public AllocateResponse answer(InvocationOnMock invocation) public AllocateResponse answer(InvocationOnMock invocation)
throws Throwable { throws Throwable {
secondHeartbeatReceived.set(true); secondHeartbeatSync.incrementAndGet();
while(heartbeatBlock.get()) {
synchronized(heartbeatBlock) {
heartbeatBlock.wait();
}
}
secondHeartbeatSync.incrementAndGet();
return response2; return response2;
} }
}).thenReturn(emptyResponse); }).thenReturn(emptyResponse);
when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
.thenReturn(null); .thenReturn(null);
when(client.getClusterAvailableResources()).thenAnswer(new Answer<Resource>() {
@Override
public Resource answer(InvocationOnMock invocation)
throws Throwable {
// take client lock to simulate behavior of real impl
synchronized (client) {
Thread.sleep(10);
}
return null;
}
});
AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler); AMRMClientAsync<ContainerRequest> asyncClient =
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
asyncClient.init(conf); asyncClient.init(conf);
asyncClient.start(); asyncClient.start();
asyncClient.registerApplicationMaster("localhost", 1234, null); asyncClient.registerApplicationMaster("localhost", 1234, null);
@ -86,10 +110,21 @@ public AllocateResponse answer(InvocationOnMock invocation)
// while the CallbackHandler will still only be processing the first response, // while the CallbackHandler will still only be processing the first response,
// heartbeater thread should still be sending heartbeats. // heartbeater thread should still be sending heartbeats.
// To test this, wait for the second heartbeat to be received. // To test this, wait for the second heartbeat to be received.
while (!secondHeartbeatReceived.get()) { while (secondHeartbeatSync.get() < 1) {
Thread.sleep(10); Thread.sleep(10);
} }
// heartbeat will be blocked. make sure we can call client methods at this
// time. Checks that heartbeat is not holding onto client lock
assert(secondHeartbeatSync.get() < 2);
asyncClient.getClusterAvailableResources();
// method returned. now unblock heartbeat
assert(secondHeartbeatSync.get() < 2);
synchronized (heartbeatBlock) {
heartbeatBlock.set(false);
heartbeatBlock.notifyAll();
}
// allocated containers should come before completed containers // allocated containers should come before completed containers
Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
@ -110,6 +145,73 @@ public AllocateResponse answer(InvocationOnMock invocation)
Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
} }
@Test(timeout=10000)
public void testAMRMClientAsyncException() throws Exception {
Configuration conf = new Configuration();
TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
String exStr = "TestException";
YarnRemoteException mockException = mock(YarnRemoteException.class);
when(mockException.getMessage()).thenReturn(exStr);
when(client.allocate(anyFloat())).thenThrow(mockException);
AMRMClientAsync<ContainerRequest> asyncClient =
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
synchronized (callbackHandler.notifier) {
asyncClient.registerApplicationMaster("localhost", 1234, null);
while(callbackHandler.savedException == null) {
try {
callbackHandler.notifier.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
asyncClient.stop();
// stopping should have joined all threads and completed all callbacks
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
@Test(timeout=10000)
public void testAMRMClientAsyncReboot() throws Exception {
Configuration conf = new Configuration();
TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AllocateResponse rebootResponse = createAllocateResponse(
new ArrayList<ContainerStatus>(), new ArrayList<Container>());
rebootResponse.setReboot(true);
when(client.allocate(anyFloat())).thenReturn(rebootResponse);
AMRMClientAsync<ContainerRequest> asyncClient =
new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
synchronized (callbackHandler.notifier) {
asyncClient.registerApplicationMaster("localhost", 1234, null);
while(callbackHandler.reboot == false) {
try {
callbackHandler.notifier.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
asyncClient.stop();
// stopping should have joined all threads and completed all callbacks
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
private AllocateResponse createAllocateResponse( private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated) { List<ContainerStatus> completed, List<Container> allocated) {
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated, AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
@ -120,6 +222,11 @@ private AllocateResponse createAllocateResponse(
private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler { private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
private volatile List<ContainerStatus> completedContainers; private volatile List<ContainerStatus> completedContainers;
private volatile List<Container> allocatedContainers; private volatile List<Container> allocatedContainers;
Exception savedException = null;
boolean reboot = false;
Object notifier = new Object();
int callbackCount = 0;
public List<ContainerStatus> takeCompletedContainers() { public List<ContainerStatus> takeCompletedContainers() {
List<ContainerStatus> ret = completedContainers; List<ContainerStatus> ret = completedContainers;
@ -176,9 +283,28 @@ public void onContainersAllocated(List<Container> containers) {
} }
@Override @Override
public void onRebootRequest() {} public void onRebootRequest() {
reboot = true;
synchronized (notifier) {
notifier.notifyAll();
}
}
@Override @Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {} public void onNodesUpdated(List<NodeReport> updatedNodes) {}
@Override
public float getProgress() {
callbackCount++;
return 0.5f;
}
@Override
public void onError(Exception e) {
savedException = e;
synchronized (notifier) {
notifier.notifyAll();
}
}
} }
} }

View File

@ -64,7 +64,7 @@ public class TestNMClient {
Configuration conf = null; Configuration conf = null;
MiniYARNCluster yarnCluster = null; MiniYARNCluster yarnCluster = null;
YarnClientImpl yarnClient = null; YarnClientImpl yarnClient = null;
AMRMClientImpl rmClient = null; AMRMClientImpl<ContainerRequest> rmClient = null;
NMClientImpl nmClient = null; NMClientImpl nmClient = null;
List<NodeReport> nodeReports = null; List<NodeReport> nodeReports = null;
ApplicationAttemptId attemptId = null; ApplicationAttemptId attemptId = null;
@ -136,7 +136,7 @@ public void setup() throws YarnRemoteException, IOException {
} }
// start am rm client // start am rm client
rmClient = new AMRMClientImpl(attemptId); rmClient = new AMRMClientImpl<ContainerRequest>(attemptId);
rmClient.init(conf); rmClient.init(conf);
rmClient.start(); rmClient.start();
assertNotNull(rmClient); assertNotNull(rmClient);
@ -185,7 +185,8 @@ public void testNMClient()
null, null); null, null);
} }
private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num) private Set<Container> allocateContainers(
AMRMClientImpl<ContainerRequest> rmClient, int num)
throws YarnRemoteException, IOException { throws YarnRemoteException, IOException {
// setup container request // setup container request
Resource capability = Resource.newInstance(1024, 0); Resource capability = Resource.newInstance(1024, 0);
@ -201,7 +202,8 @@ private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num)
} }
int containersRequestedAny = rmClient.remoteRequestsTable.get(priority) int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).getNumContainers(); .get(ResourceRequest.ANY).get(capability).remoteRequest
.getNumContainers();
// RM should allocate container within 2 calls to allocate() // RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0; int allocatedContainerCount = 0;