From 3492f5eff1a22aba0d09d72a9dfd3353525c072e Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Sat, 1 Jun 2013 08:23:30 +0000 Subject: [PATCH] YARN-660. Improve AMRMClient with matching requests (bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1488485 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../distributedshell/ApplicationMaster.java | 27 +- .../apache/hadoop/yarn/client/AMRMClient.java | 79 ++++- .../hadoop/yarn/client/AMRMClientAsync.java | 99 ++++-- .../hadoop/yarn/client/AMRMClientImpl.java | 210 +++++++++--- .../hadoop/yarn/client/TestAMRMClient.java | 313 ++++++++++++++++-- .../yarn/client/TestAMRMClientAsync.java | 138 +++++++- .../hadoop/yarn/client/TestNMClient.java | 10 +- 8 files changed, 737 insertions(+), 141 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7aa4cbc23e4..3af104aea47 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -239,6 +239,8 @@ Release 2.0.5-beta - UNRELEASED YARN-638. Modified ResourceManager to restore RMDelegationTokens after restarting. (Jian He via vinodkv) + YARN-660. Improve AMRMClient with matching requests (bikas) + OPTIMIZATIONS YARN-512. Log aggregation root directory check is more expensive than it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 2f9f3596250..88dcffd5c99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -151,7 +151,7 @@ public class ApplicationMaster { private YarnRPC rpc; // Handle to communicate with the Resource Manager - private AMRMClientAsync resourceManager; + private AMRMClientAsync resourceManager; // Application Attempt Id ( combination of attemptId and fail count ) private ApplicationAttemptId appAttemptID; @@ -442,7 +442,9 @@ public class ApplicationMaster { AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); - resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener); + resourceManager = new AMRMClientAsync(appAttemptID, + 1000, + allocListener); resourceManager.init(conf); resourceManager.start(); @@ -522,7 +524,8 @@ public class ApplicationMaster { FinalApplicationStatus appStatus; String appMessage = null; success = true; - if (numFailedContainers.get() == 0) { + if (numFailedContainers.get() == 0 && + numCompletedContainers.get() == numTotalContainers) { appStatus = FinalApplicationStatus.SUCCEEDED; } else { appStatus = FinalApplicationStatus.FAILED; @@ -594,11 +597,6 @@ public class ApplicationMaster { resourceManager.addContainerRequest(containerAsk); } - // set progress to deliver to RM on next heartbeat - float progress = (float) numCompletedContainers.get() - / numTotalContainers; - resourceManager.setProgress(progress); - if (numCompletedContainers.get() == numTotalContainers) { done = true; } @@ -637,6 +635,19 @@ public class ApplicationMaster { @Override public void onNodesUpdated(List updatedNodes) {} + + @Override + public float getProgress() { + // set progress to deliver to RM on next heartbeat + float progress = (float) numCompletedContainers.get() + / numTotalContainers; + return progress; + } + + @Override + public void onError(Exception e) { + done = true; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java index 9bc8c5fae21..e56d5c3fb8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java @@ -18,8 +18,9 @@ package org.apache.hadoop.yarn.client; - import java.io.IOException; +import java.util.Collection; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -32,9 +33,11 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.service.Service; +import com.google.common.collect.ImmutableList; + @InterfaceAudience.Public @InterfaceStability.Unstable -public interface AMRMClient extends Service { +public interface AMRMClient extends Service { /** * Object to represent container request for resources. @@ -43,20 +46,41 @@ public interface AMRMClient extends Service { * Can ask for multiple containers of a given type. */ public static class ContainerRequest { - Resource capability; - String[] hosts; - String[] racks; - Priority priority; - int containerCount; + final Resource capability; + final ImmutableList hosts; + final ImmutableList racks; + final Priority priority; + final int containerCount; public ContainerRequest(Resource capability, String[] hosts, String[] racks, Priority priority, int containerCount) { this.capability = capability; - this.hosts = (hosts != null ? hosts.clone() : null); - this.racks = (racks != null ? racks.clone() : null); + this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null); + this.racks = (racks != null ? ImmutableList.copyOf(racks) : null); this.priority = priority; this.containerCount = containerCount; } + + public Resource getCapability() { + return capability; + } + + public ImmutableList getHosts() { + return hosts; + } + + public ImmutableList getRacks() { + return racks; + } + + public Priority getPriority() { + return priority; + } + + public int getContainerCount() { + return containerCount; + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Capability[").append(capability).append("]"); @@ -65,6 +89,22 @@ public interface AMRMClient extends Service { return sb.toString(); } } + + /** + * This creates a ContainerRequest for 1 container and the + * AMRMClient stores this request internally. getMatchingRequests + * can be used to retrieve these requests from AMRMClient. These requests may + * be matched with an allocated container to determine which request to assign + * the container to. removeContainerRequest must be called using + * the same assigned StoredContainerRequest object so that + * AMRMClient can remove it from its internal store. + */ + public static class StoredContainerRequest extends ContainerRequest { + public StoredContainerRequest(Resource capability, String[] hosts, + String[] racks, Priority priority) { + super(capability, hosts, racks, priority, 1); + } + } /** * Register the application master. This must be called before any @@ -117,7 +157,7 @@ public interface AMRMClient extends Service { * Request containers for resources before calling allocate * @param req Resource request */ - public void addContainerRequest(ContainerRequest req); + public void addContainerRequest(T req); /** * Remove previous container request. The previous container request may have @@ -126,7 +166,7 @@ public interface AMRMClient extends Service { * even after the remove request * @param req Resource request */ - public void removeContainerRequest(ContainerRequest req); + public void removeContainerRequest(T req); /** * Release containers assigned by the Resource Manager. If the app cannot use @@ -150,4 +190,21 @@ public interface AMRMClient extends Service { * @return Current number of nodes in the cluster */ public int getClusterNodeCount(); + + /** + * Get outstanding StoredContainerRequests matching the given + * parameters. These StoredContainerRequests should have been added via + * addContainerRequest earlier in the lifecycle. For performance, + * the AMRMClient may return its internal collection directly without creating + * a copy. Users should not perform mutable operations on the return value. + * Each collection in the list contains requests with identical + * Resource size that fit in the given capability. In a + * collection, requests will be returned in the same order as they were added. + * @return Collection of request matching the parameters + */ + public List> getMatchingRequests( + Priority priority, + String resourceName, + Resource capability); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java index 649c4eaf671..02520d91ff1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.client; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.service.AbstractService; @@ -88,55 +92,50 @@ import com.google.common.annotations.VisibleForTesting; */ @Unstable @Evolving -public class AMRMClientAsync extends AbstractService { +public class AMRMClientAsync extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class); - private final AMRMClient client; - private final int intervalMs; + private final AMRMClient client; + private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(); private final HeartbeatThread heartbeatThread; private final CallbackHandlerThread handlerThread; private final CallbackHandler handler; private final BlockingQueue responseQueue; + private final Object unregisterHeartbeatLock = new Object(); + private volatile boolean keepRunning; private volatile float progress; + private volatile Exception savedException; + public AMRMClientAsync(ApplicationAttemptId id, int intervalMs, CallbackHandler callbackHandler) { - this(new AMRMClientImpl(id), intervalMs, callbackHandler); + this(new AMRMClientImpl(id), intervalMs, callbackHandler); } @Private @VisibleForTesting - AMRMClientAsync(AMRMClient client, int intervalMs, + public AMRMClientAsync(AMRMClient client, int intervalMs, CallbackHandler callbackHandler) { super(AMRMClientAsync.class.getName()); this.client = client; - this.intervalMs = intervalMs; + this.heartbeatIntervalMs.set(intervalMs); handler = callbackHandler; heartbeatThread = new HeartbeatThread(); handlerThread = new CallbackHandlerThread(); responseQueue = new LinkedBlockingQueue(); keepRunning = true; + savedException = null; } - - /** - * Sets the application's current progress. It will be transmitted to the - * resource manager on the next heartbeat. - * @param progress - * the application's progress so far - */ - public void setProgress(float progress) { - this.progress = progress; - } - + @Override public void init(Configuration conf) { super.init(conf); client.init(conf); - } + } @Override public void start() { @@ -171,6 +170,17 @@ public class AMRMClientAsync extends AbstractService { super.stop(); } + public void setHeartbeatInterval(int interval) { + heartbeatIntervalMs.set(interval); + } + + public List> getMatchingRequests( + Priority priority, + String resourceName, + Resource capability) { + return client.getMatchingRequests(priority, resourceName, capability); + } + /** * Registers this application master with the resource manager. On successful * registration, starts the heartbeating thread. @@ -180,8 +190,8 @@ public class AMRMClientAsync extends AbstractService { public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnRemoteException, IOException { - RegisterApplicationMasterResponse response = - client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); + RegisterApplicationMasterResponse response = client + .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); heartbeatThread.start(); return response; } @@ -195,8 +205,9 @@ public class AMRMClientAsync extends AbstractService { * @throws IOException */ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, - String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException { - synchronized (client) { + String appMessage, String appTrackingUrl) throws YarnRemoteException, + IOException { + synchronized (unregisterHeartbeatLock) { keepRunning = false; client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl); } @@ -206,7 +217,7 @@ public class AMRMClientAsync extends AbstractService { * Request containers for resources before calling allocate * @param req Resource request */ - public void addContainerRequest(AMRMClient.ContainerRequest req) { + public void addContainerRequest(T req) { client.addContainerRequest(req); } @@ -217,7 +228,7 @@ public class AMRMClientAsync extends AbstractService { * even after the remove request * @param req Resource request */ - public void removeContainerRequest(AMRMClient.ContainerRequest req) { + public void removeContainerRequest(T req) { client.removeContainerRequest(req); } @@ -259,7 +270,7 @@ public class AMRMClientAsync extends AbstractService { while (true) { AllocateResponse response = null; // synchronization ensures we don't send heartbeats after unregistering - synchronized (client) { + synchronized (unregisterHeartbeatLock) { if (!keepRunning) { break; } @@ -267,9 +278,17 @@ public class AMRMClientAsync extends AbstractService { try { response = client.allocate(progress); } catch (YarnRemoteException ex) { - LOG.error("Failed to heartbeat", ex); + LOG.error("Yarn exception on heartbeat", ex); + savedException = ex; + // interrupt handler thread in case it waiting on the queue + handlerThread.interrupt(); + break; } catch (IOException e) { - LOG.error("Failed to heartbeat", e); + LOG.error("IO exception on heartbeat", e); + savedException = e; + // interrupt handler thread in case it waiting on the queue + handlerThread.interrupt(); + break; } } if (response != null) { @@ -278,15 +297,15 @@ public class AMRMClientAsync extends AbstractService { responseQueue.put(response); break; } catch (InterruptedException ex) { - LOG.warn("Interrupted while waiting to put on response queue", ex); + LOG.info("Interrupted while waiting to put on response queue", ex); } } } try { - Thread.sleep(intervalMs); + Thread.sleep(heartbeatIntervalMs.get()); } catch (InterruptedException ex) { - LOG.warn("Heartbeater interrupted", ex); + LOG.info("Heartbeater interrupted", ex); } } } @@ -301,14 +320,21 @@ public class AMRMClientAsync extends AbstractService { while (keepRunning) { AllocateResponse response; try { + if(savedException != null) { + LOG.error("Stopping callback due to: ", savedException); + handler.onError(savedException); + break; + } response = responseQueue.take(); } catch (InterruptedException ex) { - LOG.info("Interrupted while waiting for queue"); + LOG.info("Interrupted while waiting for queue", ex); continue; } if (response.getReboot()) { handler.onRebootRequest(); + LOG.info("Reboot requested. Stopping callback."); + break; } List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { @@ -325,6 +351,8 @@ public class AMRMClientAsync extends AbstractService { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } + + progress = handler.getProgress(); } } } @@ -347,14 +375,19 @@ public class AMRMClientAsync extends AbstractService { /** * Called when the ResourceManager wants the ApplicationMaster to reboot - * for being out of sync. + * for being out of sync. The ApplicationMaster should not unregister with + * the RM unless the ApplicationMaster wants to be the last attempt. */ public void onRebootRequest(); /** - * Called when nodes tracked by the ResourceManager have changed in in health, + * Called when nodes tracked by the ResourceManager have changed in health, * availability etc. */ public void onNodesUpdated(List updatedNodes); + + public float getProgress(); + + public void onError(Exception e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java index 350a1cbb2b7..be851a54230 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -22,9 +22,15 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; @@ -47,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -55,8 +62,11 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.BuilderUtils; +// TODO check inputs for null etc. YARN-654 + @Unstable -public class AMRMClientImpl extends AbstractService implements AMRMClient { +public class AMRMClientImpl + extends AbstractService implements AMRMClient { private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); @@ -70,6 +80,57 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { protected Resource clusterAvailableResources; protected int clusterNodeCount; + class ResourceRequestInfo { + ResourceRequest remoteRequest; + LinkedHashSet containerRequests; + + ResourceRequestInfo(Priority priority, String resourceName, + Resource capability) { + remoteRequest = BuilderUtils.newResourceRequest(priority, resourceName, + capability, 0); + containerRequests = new LinkedHashSet(); + } + } + + + /** + * Class compares Resource by memory then cpu in reverse order + */ + class ResourceReverseMemoryThenCpuComparator implements Comparator { + @Override + public int compare(Resource arg0, Resource arg1) { + int mem0 = arg0.getMemory(); + int mem1 = arg1.getMemory(); + int cpu0 = arg0.getVirtualCores(); + int cpu1 = arg1.getVirtualCores(); + if(mem0 == mem1) { + if(cpu0 == cpu1) { + return 0; + } + if(cpu0 < cpu1) { + return 1; + } + return -1; + } + if(mem0 < mem1) { + return 1; + } + return -1; + } + } + + static boolean canFit(Resource arg0, Resource arg1) { + int mem0 = arg0.getMemory(); + int mem1 = arg1.getMemory(); + int cpu0 = arg0.getVirtualCores(); + int cpu1 = arg1.getVirtualCores(); + + if(mem0 <= mem1 && cpu0 <= cpu1) { + return true; + } + return false; + } + //Key -> Priority //Value -> Map //Key->ResourceName (e.g., hostname, rackname, *) @@ -77,9 +138,9 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { //Key->Resource Capability //Value->ResourceRequest protected final - Map>> + Map>> remoteRequestsTable = - new TreeMap>>(); + new TreeMap>>(); protected final Set ask = new TreeSet( new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator()); @@ -223,42 +284,47 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { } @Override - public synchronized void addContainerRequest(ContainerRequest req) { + public synchronized void addContainerRequest(T req) { // Create resource requests - if(req.hosts != null) { + // add check for dup locations + if (req.hosts != null) { for (String host : req.hosts) { - addResourceRequest(req.priority, host, req.capability, req.containerCount); + addResourceRequest(req.priority, host, req.capability, + req.containerCount, req); } } - if(req.racks != null) { + if (req.racks != null) { for (String rack : req.racks) { - addResourceRequest(req.priority, rack, req.capability, req.containerCount); + addResourceRequest(req.priority, rack, req.capability, + req.containerCount, req); } } // Off-switch addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.containerCount); + req.containerCount, req); } @Override - public synchronized void removeContainerRequest(ContainerRequest req) { + public synchronized void removeContainerRequest(T req) { // Update resource requests - if(req.hosts != null) { + if (req.hosts != null) { for (String hostName : req.hosts) { - decResourceRequest(req.priority, hostName, req.capability, req.containerCount); + decResourceRequest(req.priority, hostName, req.capability, + req.containerCount, req); } } - - if(req.racks != null) { + + if (req.racks != null) { for (String rack : req.racks) { - decResourceRequest(req.priority, rack, req.capability, req.containerCount); + decResourceRequest(req.priority, rack, req.capability, + req.containerCount, req); } } - + decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.containerCount); + req.containerCount, req); } @Override @@ -276,6 +342,44 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { return clusterNodeCount; } + @Override + public synchronized List> getMatchingRequests( + Priority priority, + String resourceName, + Resource capability) { + List> list = new LinkedList>(); + Map> remoteRequests = + this.remoteRequestsTable.get(priority); + if (remoteRequests == null) { + return list; + } + TreeMap reqMap = remoteRequests + .get(resourceName); + if (reqMap == null) { + return list; + } + + ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); + if (resourceRequestInfo != null) { + list.add(resourceRequestInfo.containerRequests); + return list; + } + + // no exact match. Container may be larger than what was requested. + // get all resources <= capability. map is reverse sorted. + SortedMap tailMap = + reqMap.tailMap(capability); + for(Map.Entry entry : tailMap.entrySet()) { + if(canFit(entry.getKey(), capability)) { + // match found that fits in the larger resource + list.add(entry.getValue().containerRequests); + } + } + + // no match found + return list; + } + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { // This code looks weird but is needed because of the following scenario. // A ResourceRequest is removed from the remoteRequestTable. A 0 container @@ -294,44 +398,57 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { } private void addResourceRequest(Priority priority, String resourceName, - Resource capability, int containerCount) { - Map> remoteRequests = + Resource capability, int containerCount, T req) { + Map> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { - remoteRequests = new HashMap>(); + remoteRequests = + new HashMap>(); this.remoteRequestsTable.put(priority, remoteRequests); if (LOG.isDebugEnabled()) { LOG.debug("Added priority=" + priority); } } - Map reqMap = remoteRequests.get(resourceName); + TreeMap reqMap = + remoteRequests.get(resourceName); if (reqMap == null) { - reqMap = new HashMap(); + // capabilities are stored in reverse sorted order. smallest last. + reqMap = new TreeMap( + new ResourceReverseMemoryThenCpuComparator()); remoteRequests.put(resourceName, reqMap); } - ResourceRequest remoteRequest = reqMap.get(capability); - if (remoteRequest == null) { - remoteRequest = BuilderUtils. - newResourceRequest(priority, resourceName, capability, 0); - reqMap.put(capability, remoteRequest); + ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); + if (resourceRequestInfo == null) { + resourceRequestInfo = + new ResourceRequestInfo(priority, resourceName, capability); + reqMap.put(capability, resourceRequestInfo); } - remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount); + resourceRequestInfo.remoteRequest.setNumContainers( + resourceRequestInfo.remoteRequest.getNumContainers() + containerCount); + + if(req instanceof StoredContainerRequest) { + resourceRequestInfo.containerRequests.add(req); + } // Note this down for next interaction with ResourceManager - addResourceRequestToAsk(remoteRequest); + addResourceRequestToAsk(resourceRequestInfo.remoteRequest); if (LOG.isDebugEnabled()) { LOG.debug("addResourceRequest:" + " applicationId=" + appAttemptId + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); } } - private void decResourceRequest(Priority priority, String resourceName, - Resource capability, int containerCount) { - Map> remoteRequests = + private void decResourceRequest(Priority priority, + String resourceName, + Resource capability, + int containerCount, + T req) { + Map> remoteRequests = this.remoteRequestsTable.get(priority); if(remoteRequests == null) { @@ -342,7 +459,7 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { return; } - Map reqMap = remoteRequests.get(resourceName); + Map reqMap = remoteRequests.get(resourceName); if (reqMap == null) { if (LOG.isDebugEnabled()) { LOG.debug("Not decrementing resource as " + resourceName @@ -350,28 +467,34 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { } return; } - ResourceRequest remoteRequest = reqMap.get(capability); + ResourceRequestInfo resourceRequestInfo = reqMap.get(capability); if (LOG.isDebugEnabled()) { LOG.debug("BEFORE decResourceRequest:" + " applicationId=" + appAttemptId + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); } - remoteRequest. - setNumContainers(remoteRequest.getNumContainers() - containerCount); - if(remoteRequest.getNumContainers() < 0) { + resourceRequestInfo.remoteRequest.setNumContainers( + resourceRequestInfo.remoteRequest.getNumContainers() - containerCount); + + if(req instanceof StoredContainerRequest) { + resourceRequestInfo.containerRequests.remove(req); + } + + if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) { // guard against spurious removals - remoteRequest.setNumContainers(0); + resourceRequestInfo.remoteRequest.setNumContainers(0); } // send the ResourceRequest to RM even if is 0 because it needs to override // a previously sent value. If ResourceRequest was not sent previously then // sending 0 aught to be a no-op on RM - addResourceRequestToAsk(remoteRequest); + addResourceRequestToAsk(resourceRequestInfo.remoteRequest); // delete entries from map if no longer needed - if (remoteRequest.getNumContainers() == 0) { + if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) { reqMap.remove(capability); if (reqMap.size() == 0) { remoteRequests.remove(resourceName); @@ -385,7 +508,8 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient { LOG.info("AFTER decResourceRequest:" + " applicationId=" + appAttemptId + " priority=" + priority.getPriority() + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + + resourceRequestInfo.remoteRequest.getNumContainers() + + " #asks=" + ask.size()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java index 2764061a274..74a1d163412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -50,27 +51,38 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestAMRMClient { - Configuration conf = null; - MiniYARNCluster yarnCluster = null; - YarnClientImpl yarnClient = null; - List nodeReports = null; - ApplicationAttemptId attemptId = null; - int nodeCount = 3; + static Configuration conf = null; + static MiniYARNCluster yarnCluster = null; + static YarnClientImpl yarnClient = null; + static List nodeReports = null; + static ApplicationAttemptId attemptId = null; + static int nodeCount = 3; - @Before - public void setup() throws YarnRemoteException, IOException { + static Resource capability; + static Priority priority; + static String node; + static String rack; + static String[] nodes; + static String[] racks; + + @BeforeClass + public static void setup() throws Exception { // start minicluster conf = new YarnConfiguration(); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); @@ -84,7 +96,17 @@ public class TestAMRMClient { // get node info nodeReports = yarnClient.getNodeReports(); - + + priority = BuilderUtils.newPriority(1); + capability = BuilderUtils.newResource(1024, 1); + node = nodeReports.get(0).getNodeId().getHost(); + rack = nodeReports.get(0).getRackName(); + nodes = new String[]{ node }; + racks = new String[]{ rack }; + } + + @Before + public void startApp() throws Exception { // submit new app GetNewApplicationResponse newApp = yarnClient.getNewApplication(); ApplicationId appId = newApp.getApplicationId(); @@ -125,7 +147,12 @@ public class TestAMRMClient { } @After - public void tearDown() { + public void cancelApp() { + attemptId = null; + } + + @AfterClass + public static void tearDown() { if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { yarnClient.stop(); } @@ -133,13 +160,235 @@ public class TestAMRMClient { yarnCluster.stop(); } } + + @Test (timeout=60000) + public void testAMRMClientMatchingFit() throws YarnRemoteException, IOException { + AMRMClientImpl amClient = null; + try { + // start am rm client + amClient = new AMRMClientImpl(attemptId); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + Resource capability1 = BuilderUtils.newResource(1024, 2); + Resource capability2 = BuilderUtils.newResource(1024, 1); + Resource capability3 = BuilderUtils.newResource(1000, 2); + Resource capability4 = BuilderUtils.newResource(2000, 1); + Resource capability5 = BuilderUtils.newResource(1000, 3); + Resource capability6 = BuilderUtils.newResource(2000, 1); + + StoredContainerRequest storedContainer1 = + new StoredContainerRequest(capability1, nodes, racks, priority); + StoredContainerRequest storedContainer2 = + new StoredContainerRequest(capability2, nodes, racks, priority); + StoredContainerRequest storedContainer3 = + new StoredContainerRequest(capability3, nodes, racks, priority); + StoredContainerRequest storedContainer4 = + new StoredContainerRequest(capability4, nodes, racks, priority); + StoredContainerRequest storedContainer5 = + new StoredContainerRequest(capability5, nodes, racks, priority); + StoredContainerRequest storedContainer6 = + new StoredContainerRequest(capability6, nodes, racks, priority); + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer2); + amClient.addContainerRequest(storedContainer3); + amClient.addContainerRequest(storedContainer4); + amClient.addContainerRequest(storedContainer5); + amClient.addContainerRequest(storedContainer6); + + // test matching of containers + List> matches; + StoredContainerRequest storedRequest; + // exact match + Resource testCapability1 = BuilderUtils.newResource(1024, 2); + matches = amClient.getMatchingRequests(priority, node, testCapability1); + verifyMatches(matches, 1); + storedRequest = matches.get(0).iterator().next(); + assertTrue(storedContainer1 == storedRequest); + amClient.removeContainerRequest(storedContainer1); + + // exact matching with order maintained + Resource testCapability2 = BuilderUtils.newResource(2000, 1); + matches = amClient.getMatchingRequests(priority, node, testCapability2); + verifyMatches(matches, 2); + // must be returned in the order they were made + int i = 0; + for(StoredContainerRequest storedRequest1 : matches.get(0)) { + if(i++ == 0) { + assertTrue(storedContainer4 == storedRequest1); + } else { + assertTrue(storedContainer6 == storedRequest1); + } + } + amClient.removeContainerRequest(storedContainer6); + + // matching with larger container. all requests returned + Resource testCapability3 = BuilderUtils.newResource(4000, 4); + matches = amClient.getMatchingRequests(priority, node, testCapability3); + assert(matches.size() == 4); + + Resource testCapability4 = BuilderUtils.newResource(1024, 2); + matches = amClient.getMatchingRequests(priority, node, testCapability4); + assert(matches.size() == 2); + // verify non-fitting containers are not returned and fitting ones are + for(Collection testSet : matches) { + assertTrue(testSet.size() == 1); + StoredContainerRequest testRequest = testSet.iterator().next(); + assertTrue(testRequest != storedContainer4); + assertTrue(testRequest != storedContainer5); + assert(testRequest == storedContainer2 || + testRequest == storedContainer3); + } + + Resource testCapability5 = BuilderUtils.newResource(512, 4); + matches = amClient.getMatchingRequests(priority, node, testCapability5); + assert(matches.size() == 0); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } + + private void verifyMatches( + List> matches, + int matchSize) { + assertTrue(matches.size() == 1); + assertTrue(matches.get(0).size() == matchSize); + } + + @Test (timeout=60000) + public void testAMRMClientMatchStorage() throws YarnRemoteException, IOException { + AMRMClientImpl amClient = null; + try { + // start am rm client + amClient = new AMRMClientImpl(attemptId); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + + Priority priority1 = Records.newRecord(Priority.class); + priority1.setPriority(2); + + StoredContainerRequest storedContainer1 = + new StoredContainerRequest(capability, nodes, racks, priority); + StoredContainerRequest storedContainer2 = + new StoredContainerRequest(capability, nodes, racks, priority); + StoredContainerRequest storedContainer3 = + new StoredContainerRequest(capability, null, null, priority1); + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer2); + amClient.addContainerRequest(storedContainer3); + + // test addition and storage + int containersRequestedAny = amClient.remoteRequestsTable.get(priority) + .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + assertTrue(containersRequestedAny == 2); + containersRequestedAny = amClient.remoteRequestsTable.get(priority1) + .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); + assertTrue(containersRequestedAny == 1); + List> matches = + amClient.getMatchingRequests(priority, node, capability); + verifyMatches(matches, 2); + matches = amClient.getMatchingRequests(priority, rack, capability); + verifyMatches(matches, 2); + matches = + amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); + verifyMatches(matches, 2); + matches = amClient.getMatchingRequests(priority1, rack, capability); + assertTrue(matches.isEmpty()); + matches = + amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); + verifyMatches(matches, 1); + + // test removal + amClient.removeContainerRequest(storedContainer3); + matches = amClient.getMatchingRequests(priority, node, capability); + verifyMatches(matches, 2); + amClient.removeContainerRequest(storedContainer2); + matches = amClient.getMatchingRequests(priority, node, capability); + verifyMatches(matches, 1); + matches = amClient.getMatchingRequests(priority, rack, capability); + verifyMatches(matches, 1); + + // test matching of containers + StoredContainerRequest storedRequest = matches.get(0).iterator().next(); + assertTrue(storedContainer1 == storedRequest); + amClient.removeContainerRequest(storedContainer1); + matches = + amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability); + assertTrue(matches.isEmpty()); + matches = + amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability); + assertTrue(matches.isEmpty()); + // 0 requests left. everything got cleaned up + assertTrue(amClient.remoteRequestsTable.isEmpty()); + + // go through an exemplary allocation, matching and release cycle + amClient.addContainerRequest(storedContainer1); + amClient.addContainerRequest(storedContainer3); + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 2; + while (allocatedContainerCount < 2 + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertTrue(amClient.ask.size() == 0); + assertTrue(amClient.release.size() == 0); + + assertTrue(nodeCount == amClient.getClusterNodeCount()); + allocatedContainerCount += allocResponse.getAllocatedContainers().size(); + for(Container container : allocResponse.getAllocatedContainers()) { + ContainerRequest expectedRequest = + container.getPriority().equals(storedContainer1.getPriority()) ? + storedContainer1 : storedContainer3; + matches = amClient.getMatchingRequests(container.getPriority(), + ResourceRequest.ANY, + container.getResource()); + // test correct matched container is returned + verifyMatches(matches, 1); + ContainerRequest matchedRequest = matches.get(0).iterator().next(); + assertTrue(matchedRequest == expectedRequest); + + // assign this container, use it and release it + amClient.releaseAssignedContainer(container.getId()); + } + if(allocatedContainerCount < containersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(1000); + } + } + + assertTrue(allocatedContainerCount == 2); + assertTrue(amClient.release.size() == 2); + assertTrue(amClient.ask.size() == 0); + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertTrue(amClient.release.size() == 0); + assertTrue(amClient.ask.size() == 0); + assertTrue(allocResponse.getAllocatedContainers().size() == 0); + + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } @Test (timeout=60000) public void testAMRMClient() throws YarnRemoteException, IOException { - AMRMClientImpl amClient = null; + AMRMClientImpl amClient = null; try { // start am rm client - amClient = new AMRMClientImpl(attemptId); + amClient = new AMRMClientImpl(attemptId); amClient.init(conf); amClient.start(); @@ -156,36 +405,27 @@ public class TestAMRMClient { } } } - - - private void testAllocation(final AMRMClientImpl amClient) + + private void testAllocation(final AMRMClientImpl amClient) throws YarnRemoteException, IOException { // setup container request - final Resource capability = Records.newRecord(Resource.class); - final Priority priority = Records.newRecord(Priority.class); - priority.setPriority(0); - capability.setMemory(1024); - String node = nodeReports.get(0).getNodeId().getHost(); - String rack = nodeReports.get(0).getRackName(); - final String[] nodes = { node }; - final String[] racks = { rack }; assertTrue(amClient.ask.size() == 0); assertTrue(amClient.release.size() == 0); - amClient.addContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 1)); - amClient.addContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 3)); - amClient.removeContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 2)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 1)); + amClient.addContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 3)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 2)); int containersRequestedNode = amClient.remoteRequestsTable.get(priority) - .get(node).get(capability).getNumContainers(); + .get(node).get(capability).remoteRequest.getNumContainers(); int containersRequestedRack = amClient.remoteRequestsTable.get(priority) - .get(rack).get(capability).getNumContainers(); + .get(rack).get(capability).remoteRequest.getNumContainers(); int containersRequestedAny = amClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).getNumContainers(); + .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers(); assertTrue(containersRequestedNode == 2); assertTrue(containersRequestedRack == 2); @@ -221,8 +461,8 @@ public class TestAMRMClient { assertTrue(amClient.ask.size() == 0); // need to tell the AMRMClient that we dont need these resources anymore - amClient.removeContainerRequest(new ContainerRequest(capability, nodes, - racks, priority, 2)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, racks, priority, 2)); assertTrue(amClient.ask.size() == 3); // send 0 container count request for resources that are no longer needed ResourceRequest snoopRequest = amClient.ask.iterator().next(); @@ -241,8 +481,9 @@ public class TestAMRMClient { new Answer() { public AllocateResponse answer(InvocationOnMock invocation) throws Exception { - amClient.removeContainerRequest(new ContainerRequest(capability, - nodes, racks, priority, 2)); + amClient.removeContainerRequest( + new ContainerRequest(capability, nodes, + racks, priority, 2)); throw new Exception(); } }); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index 6b9951b2057..b637b5071c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; @@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -48,9 +52,11 @@ public class TestAMRMClientAsync { private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class); + @SuppressWarnings("unchecked") @Test(timeout=10000) public void testAMRMClientAsync() throws Exception { Configuration conf = new Configuration(); + final AtomicBoolean heartbeatBlock = new AtomicBoolean(true); List completed1 = Arrays.asList( BuilderUtils.newContainerStatus( BuilderUtils.newContainerId(0, 0, 0, 0), @@ -65,20 +71,38 @@ public class TestAMRMClientAsync { new ArrayList(), new ArrayList()); TestCallbackHandler callbackHandler = new TestCallbackHandler(); - AMRMClient client = mock(AMRMClient.class); - final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false); + final AMRMClient client = mock(AMRMClientImpl.class); + final AtomicInteger secondHeartbeatSync = new AtomicInteger(0); when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer() { @Override public AllocateResponse answer(InvocationOnMock invocation) throws Throwable { - secondHeartbeatReceived.set(true); + secondHeartbeatSync.incrementAndGet(); + while(heartbeatBlock.get()) { + synchronized(heartbeatBlock) { + heartbeatBlock.wait(); + } + } + secondHeartbeatSync.incrementAndGet(); return response2; } }).thenReturn(emptyResponse); when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) .thenReturn(null); + when(client.getClusterAvailableResources()).thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocation) + throws Throwable { + // take client lock to simulate behavior of real impl + synchronized (client) { + Thread.sleep(10); + } + return null; + } + }); - AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler); + AMRMClientAsync asyncClient = + new AMRMClientAsync(client, 20, callbackHandler); asyncClient.init(conf); asyncClient.start(); asyncClient.registerApplicationMaster("localhost", 1234, null); @@ -86,10 +110,21 @@ public class TestAMRMClientAsync { // while the CallbackHandler will still only be processing the first response, // heartbeater thread should still be sending heartbeats. // To test this, wait for the second heartbeat to be received. - while (!secondHeartbeatReceived.get()) { + while (secondHeartbeatSync.get() < 1) { Thread.sleep(10); } + // heartbeat will be blocked. make sure we can call client methods at this + // time. Checks that heartbeat is not holding onto client lock + assert(secondHeartbeatSync.get() < 2); + asyncClient.getClusterAvailableResources(); + // method returned. now unblock heartbeat + assert(secondHeartbeatSync.get() < 2); + synchronized (heartbeatBlock) { + heartbeatBlock.set(false); + heartbeatBlock.notifyAll(); + } + // allocated containers should come before completed containers Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); @@ -110,6 +145,73 @@ public class TestAMRMClientAsync { Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); } + @Test(timeout=10000) + public void testAMRMClientAsyncException() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + String exStr = "TestException"; + YarnRemoteException mockException = mock(YarnRemoteException.class); + when(mockException.getMessage()).thenReturn(exStr); + when(client.allocate(anyFloat())).thenThrow(mockException); + + AMRMClientAsync asyncClient = + new AMRMClientAsync(client, 20, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.savedException == null) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr)); + + asyncClient.stop(); + // stopping should have joined all threads and completed all callbacks + Assert.assertTrue(callbackHandler.callbackCount == 0); + } + + @Test(timeout=10000) + public void testAMRMClientAsyncReboot() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + final AllocateResponse rebootResponse = createAllocateResponse( + new ArrayList(), new ArrayList()); + rebootResponse.setReboot(true); + when(client.allocate(anyFloat())).thenReturn(rebootResponse); + + AMRMClientAsync asyncClient = + new AMRMClientAsync(client, 20, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.reboot == false) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + asyncClient.stop(); + // stopping should have joined all threads and completed all callbacks + Assert.assertTrue(callbackHandler.callbackCount == 0); + } + private AllocateResponse createAllocateResponse( List completed, List allocated) { AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated, @@ -120,6 +222,11 @@ public class TestAMRMClientAsync { private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler { private volatile List completedContainers; private volatile List allocatedContainers; + Exception savedException = null; + boolean reboot = false; + Object notifier = new Object(); + + int callbackCount = 0; public List takeCompletedContainers() { List ret = completedContainers; @@ -176,9 +283,28 @@ public class TestAMRMClientAsync { } @Override - public void onRebootRequest() {} + public void onRebootRequest() { + reboot = true; + synchronized (notifier) { + notifier.notifyAll(); + } + } @Override public void onNodesUpdated(List updatedNodes) {} + + @Override + public float getProgress() { + callbackCount++; + return 0.5f; + } + + @Override + public void onError(Exception e) { + savedException = e; + synchronized (notifier) { + notifier.notifyAll(); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java index 3b4439ea280..34ca1ae754d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java @@ -64,7 +64,7 @@ public class TestNMClient { Configuration conf = null; MiniYARNCluster yarnCluster = null; YarnClientImpl yarnClient = null; - AMRMClientImpl rmClient = null; + AMRMClientImpl rmClient = null; NMClientImpl nmClient = null; List nodeReports = null; ApplicationAttemptId attemptId = null; @@ -136,7 +136,7 @@ public class TestNMClient { } // start am rm client - rmClient = new AMRMClientImpl(attemptId); + rmClient = new AMRMClientImpl(attemptId); rmClient.init(conf); rmClient.start(); assertNotNull(rmClient); @@ -185,7 +185,8 @@ public class TestNMClient { null, null); } - private Set allocateContainers(AMRMClientImpl rmClient, int num) + private Set allocateContainers( + AMRMClientImpl rmClient, int num) throws YarnRemoteException, IOException { // setup container request Resource capability = Resource.newInstance(1024, 0); @@ -201,7 +202,8 @@ public class TestNMClient { } int containersRequestedAny = rmClient.remoteRequestsTable.get(priority) - .get(ResourceRequest.ANY).get(capability).getNumContainers(); + .get(ResourceRequest.ANY).get(capability).remoteRequest + .getNumContainers(); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0;