From bcd93d2fda8356403ff6614ddb56b9e54d1a9c49 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Thu, 3 Jan 2013 19:12:30 +0000 Subject: [PATCH] merge YARN-103 from trunk. Add a yarn AM-RM client module. Contributed by Bikas Saha. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1428555 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop-yarn/hadoop-yarn-client/pom.xml | 8 +- .../apache/hadoop/yarn/client/AMRMClient.java | 153 +++++++ .../hadoop/yarn/client/AMRMClientImpl.java | 410 ++++++++++++++++++ .../hadoop/yarn/client/TestAMRMClient.java | 297 +++++++++++++ 5 files changed, 869 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 59510ade275..d527c63f8e0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -11,6 +11,8 @@ Release 2.0.3-alpha - Unreleased YARN-230. RM Restart phase 1 - includes support for saving/restarting all applications on an RM bounce. (Bikas Saha via acmurthy) + YARN-103. Add a yarn AM-RM client module. (Bikas Saha via sseth) + IMPROVEMENTS YARN-223. Update process tree instead of getting new process trees. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index e44a5308999..c9d474779e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -36,7 +36,13 @@ org.apache.hadoop hadoop-yarn-server-resourcemanager - test + test + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar org.apache.hadoop diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java new file mode 100644 index 00000000000..1fa86d2cc24 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java @@ -0,0 +1,153 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.service.Service; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AMRMClient extends Service { + + /** + * Value used to define no locality + */ + static final String ANY = "*"; + + /** + * Object to represent container request for resources. + * Resources may be localized to nodes and racks. + * Resources may be assigned priorities. + * Can ask for multiple containers of a given type. + */ + public static class ContainerRequest { + Resource capability; + String[] hosts; + String[] racks; + Priority priority; + int containerCount; + + public ContainerRequest(Resource capability, String[] hosts, + String[] racks, Priority priority, int containerCount) { + this.capability = capability; + this.hosts = (hosts != null ? hosts.clone() : null); + this.racks = (racks != null ? racks.clone() : null); + this.priority = priority; + this.containerCount = containerCount; + } + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Capability[").append(capability).append("]"); + sb.append("Priority[").append(priority).append("]"); + sb.append("ContainerCount[").append(containerCount).append("]"); + return sb.toString(); + } + } + + /** + * Register the application master. This must be called before any + * other interaction + * @param appHostName Name of the host on which master is running + * @param appHostPort Port master is listening on + * @param appTrackingUrl URL at which the master info can be seen + * @return RegisterApplicationMasterResponse + * @throws YarnRemoteException + */ + public RegisterApplicationMasterResponse + registerApplicationMaster(String appHostName, + int appHostPort, + String appTrackingUrl) + throws YarnRemoteException; + + /** + * Request additional containers and receive new container allocations. + * Requests made via addContainerRequest are sent to the + * ResourceManager. New containers assigned to the master are + * retrieved. Status of completed containers and node health updates are + * also retrieved. + * This also doubles up as a heartbeat to the ResourceManager and must be + * made periodically. + * The call may not always return any new allocations of containers. + * App should not make concurrent allocate requests. May cause request loss. + * @param progressIndicator Indicates progress made by the master + * @return the response of the allocate request + * @throws YarnRemoteException + */ + public AllocateResponse allocate(float progressIndicator) + throws YarnRemoteException; + + /** + * Unregister the application master. This must be called in the end. + * @param appStatus Success/Failure status of the master + * @param appMessage Diagnostics message on failure + * @param appTrackingUrl New URL to get master info + * @throws YarnRemoteException + */ + public void unregisterApplicationMaster(FinalApplicationStatus appStatus, + String appMessage, + String appTrackingUrl) + throws YarnRemoteException; + + /** + * Request containers for resources before calling allocate + * @param req Resource request + */ + public void addContainerRequest(ContainerRequest req); + + /** + * Remove previous container request. The previous container request may have + * already been sent to the ResourceManager. So even after the remove request + * the app must be prepared to receive an allocation for the previous request + * even after the remove request + * @param req Resource request + */ + public void removeContainerRequest(ContainerRequest req); + + /** + * Release containers assigned by the Resource Manager. If the app cannot use + * the container or wants to give up the container then it can release them. + * The app needs to make new requests for the released resource capability if + * it still needs it. eg. it released non-local resources + * @param containerId + */ + public void releaseAssignedContainer(ContainerId containerId); + + /** + * Get the currently available resources in the cluster. + * A valid value is available after a call to allocate has been made + * @return Currently available resources + */ + public Resource getClusterAvailableResources(); + + /** + * Get the current number of nodes in the cluster. + * A valid values is available after a call to allocate has been made + * @return Current number of nodes in the cluster + */ + public int getClusterNodeCount(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java new file mode 100644 index 00000000000..15c250e8d5f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -0,0 +1,410 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.BuilderUtils; + +@Unstable +public class AMRMClientImpl extends AbstractService implements AMRMClient { + + private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class); + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + private int lastResponseId = 0; + + protected AMRMProtocol rmClient; + protected final ApplicationAttemptId appAttemptId; + protected Resource clusterAvailableResources; + protected int clusterNodeCount; + + //Key -> Priority + //Value -> Map + //Key->ResourceName (e.g., hostname, rackname, *) + //Value->Map + //Key->Resource Capability + //Value->ResourceRequest + protected final + Map>> + remoteRequestsTable = + new TreeMap>>(); + + protected final Set ask = new TreeSet( + new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator()); + protected final Set release = new TreeSet(); + + public AMRMClientImpl(ApplicationAttemptId appAttemptId) { + super(AMRMClientImpl.class.getName()); + this.appAttemptId = appAttemptId; + } + + @Override + public synchronized void init(Configuration conf) { + super.init(conf); + } + + @Override + public synchronized void start() { + final YarnConfiguration conf = new YarnConfiguration(getConfig()); + final YarnRPC rpc = YarnRPC.create(conf); + final InetSocketAddress rmAddress = conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + + UserGroupInformation currentUser; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new YarnException(e); + } + + if (UserGroupInformation.isSecurityEnabled()) { + String tokenURLEncodedStr = System.getenv().get( + ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); + Token token = new Token(); + + try { + token.decodeFromUrlString(tokenURLEncodedStr); + } catch (IOException e) { + throw new YarnException(e); + } + + SecurityUtil.setTokenService(token, rmAddress); + if (LOG.isDebugEnabled()) { + LOG.debug("AppMasterToken is " + token); + } + currentUser.addToken(token); + } + + rmClient = currentUser.doAs(new PrivilegedAction() { + @Override + public AMRMProtocol run() { + return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, + conf); + } + }); + LOG.debug("Connecting to ResourceManager at " + rmAddress); + super.start(); + } + + @Override + public synchronized void stop() { + RPC.stopProxy(this.rmClient); + super.stop(); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + String appHostName, int appHostPort, String appTrackingUrl) + throws YarnRemoteException { + // do this only once ??? + RegisterApplicationMasterRequest request = recordFactory + .newRecordInstance(RegisterApplicationMasterRequest.class); + synchronized (this) { + request.setApplicationAttemptId(appAttemptId); + } + request.setHost(appHostName); + request.setRpcPort(appHostPort); + if(appTrackingUrl != null) { + request.setTrackingUrl(appTrackingUrl); + } + RegisterApplicationMasterResponse response = rmClient + .registerApplicationMaster(request); + return response; + } + + @Override + public AllocateResponse allocate(float progressIndicator) + throws YarnRemoteException { + AllocateResponse allocateResponse = null; + ArrayList askList = null; + ArrayList releaseList = null; + AllocateRequest allocateRequest = null; + + try { + synchronized (this) { + askList = new ArrayList(ask); + releaseList = new ArrayList(release); + // optimistically clear this collection assuming no RPC failure + ask.clear(); + release.clear(); + allocateRequest = BuilderUtils + .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator, + askList, releaseList); + } + + allocateResponse = rmClient.allocate(allocateRequest); + AMResponse response = allocateResponse.getAMResponse(); + + synchronized (this) { + // update these on successful RPC + clusterNodeCount = allocateResponse.getNumClusterNodes(); + lastResponseId = response.getResponseId(); + clusterAvailableResources = response.getAvailableResources(); + } + } finally { + // TODO how to differentiate remote yarn exception vs error in rpc + if(allocateResponse == null) { + // we hit an exception in allocate() + // preserve ask and release for next call to allocate() + synchronized (this) { + release.addAll(releaseList); + // requests could have been added or deleted during call to allocate + // If requests were added/removed then there is nothing to do since + // the ResourceRequest object in ask would have the actual new value. + // If ask does not have this ResourceRequest then it was unchanged and + // so we can add the value back safely. + // This assumes that there will no concurrent calls to allocate() and + // so we dont have to worry about ask being changed in the + // synchronized block at the beginning of this method. + for(ResourceRequest oldAsk : askList) { + if(!ask.contains(oldAsk)) { + ask.add(oldAsk); + } + } + } + } + } + return allocateResponse; + } + + @Override + public void unregisterApplicationMaster(FinalApplicationStatus appStatus, + String appMessage, String appTrackingUrl) throws YarnRemoteException { + FinishApplicationMasterRequest request = recordFactory + .newRecordInstance(FinishApplicationMasterRequest.class); + request.setAppAttemptId(appAttemptId); + request.setFinishApplicationStatus(appStatus); + if(appMessage != null) { + request.setDiagnostics(appMessage); + } + if(appTrackingUrl != null) { + request.setTrackingUrl(appTrackingUrl); + } + rmClient.finishApplicationMaster(request); + } + + @Override + public synchronized void addContainerRequest(ContainerRequest req) { + // Create resource requests + if(req.hosts != null) { + for (String host : req.hosts) { + addResourceRequest(req.priority, host, req.capability, req.containerCount); + } + } + + if(req.racks != null) { + for (String rack : req.racks) { + addResourceRequest(req.priority, rack, req.capability, req.containerCount); + } + } + + // Off-switch + addResourceRequest(req.priority, ANY, req.capability, req.containerCount); + } + + @Override + public synchronized void removeContainerRequest(ContainerRequest req) { + // Update resource requests + if(req.hosts != null) { + for (String hostName : req.hosts) { + decResourceRequest(req.priority, hostName, req.capability, req.containerCount); + } + } + + if(req.racks != null) { + for (String rack : req.racks) { + decResourceRequest(req.priority, rack, req.capability, req.containerCount); + } + } + + decResourceRequest(req.priority, ANY, req.capability, req.containerCount); + } + + @Override + public synchronized void releaseAssignedContainer(ContainerId containerId) { + release.add(containerId); + } + + @Override + public synchronized Resource getClusterAvailableResources() { + return clusterAvailableResources; + } + + @Override + public synchronized int getClusterNodeCount() { + return clusterNodeCount; + } + + private void addResourceRequestToAsk(ResourceRequest remoteRequest) { + // This code looks weird but is needed because of the following scenario. + // A ResourceRequest is removed from the remoteRequestTable. A 0 container + // request is added to 'ask' to notify the RM about not needing it any more. + // Before the call to allocate, the user now requests more containers. If + // the locations of the 0 size request and the new request are the same + // (with the difference being only container count), then the set comparator + // will consider both to be the same and not add the new request to ask. So + // we need to check for the "same" request being present and remove it and + // then add it back. The comparator is container count agnostic. + // This should happen only rarely but we do need to guard against it. + if(ask.contains(remoteRequest)) { + ask.remove(remoteRequest); + } + ask.add(remoteRequest); + } + + private void addResourceRequest(Priority priority, String resourceName, + Resource capability, int containerCount) { + Map> remoteRequests = + this.remoteRequestsTable.get(priority); + if (remoteRequests == null) { + remoteRequests = new HashMap>(); + this.remoteRequestsTable.put(priority, remoteRequests); + if (LOG.isDebugEnabled()) { + LOG.debug("Added priority=" + priority); + } + } + Map reqMap = remoteRequests.get(resourceName); + if (reqMap == null) { + reqMap = new HashMap(); + remoteRequests.put(resourceName, reqMap); + } + ResourceRequest remoteRequest = reqMap.get(capability); + if (remoteRequest == null) { + remoteRequest = BuilderUtils. + newResourceRequest(priority, resourceName, capability, 0); + reqMap.put(capability, remoteRequest); + } + + remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount); + + // Note this down for next interaction with ResourceManager + addResourceRequestToAsk(remoteRequest); + + if (LOG.isDebugEnabled()) { + LOG.debug("addResourceRequest:" + " applicationId=" + + appAttemptId + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + } + } + + private void decResourceRequest(Priority priority, String resourceName, + Resource capability, int containerCount) { + Map> remoteRequests = + this.remoteRequestsTable.get(priority); + + if(remoteRequests == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not decrementing resource as priority " + priority + + " is not present in request table"); + } + return; + } + + Map reqMap = remoteRequests.get(resourceName); + if (reqMap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not decrementing resource as " + resourceName + + " is not present in request table"); + } + return; + } + ResourceRequest remoteRequest = reqMap.get(capability); + + if (LOG.isDebugEnabled()) { + LOG.debug("BEFORE decResourceRequest:" + " applicationId=" + + appAttemptId + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + } + + remoteRequest. + setNumContainers(remoteRequest.getNumContainers() - containerCount); + if(remoteRequest.getNumContainers() < 0) { + // guard against spurious removals + remoteRequest.setNumContainers(0); + } + // send the ResourceRequest to RM even if is 0 because it needs to override + // a previously sent value. If ResourceRequest was not sent previously then + // sending 0 aught to be a no-op on RM + addResourceRequestToAsk(remoteRequest); + + // delete entries from map if no longer needed + if (remoteRequest.getNumContainers() == 0) { + reqMap.remove(capability); + if (reqMap.size() == 0) { + remoteRequests.remove(resourceName); + } + if (remoteRequests.size() == 0) { + remoteRequestsTable.remove(priority); + } + } + + if (LOG.isDebugEnabled()) { + LOG.info("AFTER decResourceRequest:" + " applicationId=" + + appAttemptId + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java new file mode 100644 index 00000000000..95fae134b29 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.service.Service.STATE; +import org.apache.hadoop.yarn.util.Records; + +public class TestAMRMClient { + Configuration conf = null; + MiniYARNCluster yarnCluster = null; + YarnClientImpl yarnClient = null; + List nodeReports = null; + ApplicationAttemptId attemptId = null; + int nodeCount = 3; + + @Before + public void setup() throws YarnRemoteException { + // start minicluster + conf = new YarnConfiguration(); + yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + + // start rm client + yarnClient = new YarnClientImpl(); + yarnClient.init(conf); + yarnClient.start(); + + // get node info + nodeReports = yarnClient.getNodeReports(); + + // submit new app + GetNewApplicationResponse newApp = yarnClient.getNewApplication(); + ApplicationId appId = newApp.getApplicationId(); + + ApplicationSubmissionContext appContext = Records + .newRecord(ApplicationSubmissionContext.class); + // set the application id + appContext.setApplicationId(appId); + // set the application name + appContext.setApplicationName("Test"); + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue("default"); + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records + .newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + // unmanaged AM + appContext.setUnmanagedAM(true); + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + // Submit the application to the applications manager + yarnClient.submitApplication(appContext); + + // wait for app to start + while (true) { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + attemptId = appReport.getCurrentApplicationAttemptId(); + break; + } + } + } + + @After + public void tearDown() { + if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { + yarnClient.stop(); + } + if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) { + yarnCluster.stop(); + } + } + + @Test (timeout=60000) + public void testAMRMClient() throws YarnRemoteException { + AMRMClientImpl amClient = null; + try { + // start am rm client + amClient = new AMRMClientImpl(attemptId); + amClient.init(conf); + amClient.start(); + + amClient.registerApplicationMaster("Host", 10000, ""); + + testAllocation(amClient); + + amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, + null, null); + + } finally { + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + } + } + + + private void testAllocation(final AMRMClientImpl amClient) + throws YarnRemoteException { + // setup container request + final Resource capability = Records.newRecord(Resource.class); + final Priority priority = Records.newRecord(Priority.class); + priority.setPriority(0); + capability.setMemory(1024); + String node = nodeReports.get(0).getNodeId().getHost(); + String rack = nodeReports.get(0).getRackName(); + final String[] nodes = { node }; + final String[] racks = { rack }; + + assertTrue(amClient.ask.size() == 0); + assertTrue(amClient.release.size() == 0); + + amClient.addContainerRequest(new ContainerRequest(capability, nodes, + racks, priority, 1)); + amClient.addContainerRequest(new ContainerRequest(capability, nodes, + racks, priority, 3)); + amClient.removeContainerRequest(new ContainerRequest(capability, nodes, + racks, priority, 2)); + + int containersRequestedNode = amClient.remoteRequestsTable.get(priority) + .get(node).get(capability).getNumContainers(); + int containersRequestedRack = amClient.remoteRequestsTable.get(priority) + .get(rack).get(capability).getNumContainers(); + int containersRequestedAny = amClient.remoteRequestsTable.get(priority) + .get(AMRMClient.ANY).get(capability).getNumContainers(); + + assertTrue(containersRequestedNode == 2); + assertTrue(containersRequestedRack == 2); + assertTrue(containersRequestedAny == 2); + assertTrue(amClient.ask.size() == 3); + assertTrue(amClient.release.size() == 0); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 2; + Set releases = new TreeSet(); + while (allocatedContainerCount < containersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertTrue(amClient.ask.size() == 0); + assertTrue(amClient.release.size() == 0); + + assertTrue(nodeCount == amClient.getClusterNodeCount()); + AMResponse amResponse = allocResponse.getAMResponse(); + allocatedContainerCount += amResponse.getAllocatedContainers().size(); + for(Container container : amResponse.getAllocatedContainers()) { + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + amClient.releaseAssignedContainer(rejectContainerId); + } + if(allocatedContainerCount < containersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(1000); + } + } + + assertTrue(allocatedContainerCount == containersRequestedAny); + assertTrue(amClient.release.size() == 2); + assertTrue(amClient.ask.size() == 0); + + // need to tell the AMRMClient that we dont need these resources anymore + amClient.removeContainerRequest(new ContainerRequest(capability, nodes, + racks, priority, 2)); + assertTrue(amClient.ask.size() == 3); + // send 0 container count request for resources that are no longer needed + ResourceRequest snoopRequest = amClient.ask.iterator().next(); + assertTrue(snoopRequest.getNumContainers() == 0); + + // test RPC exception handling + amClient.addContainerRequest(new ContainerRequest(capability, nodes, + racks, priority, 2)); + snoopRequest = amClient.ask.iterator().next(); + assertTrue(snoopRequest.getNumContainers() == 2); + + AMRMProtocol realRM = amClient.rmClient; + try { + AMRMProtocol mockRM = mock(AMRMProtocol.class); + when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer( + new Answer() { + public AllocateResponse answer(InvocationOnMock invocation) + throws Exception { + amClient.removeContainerRequest(new ContainerRequest(capability, + nodes, racks, priority, 2)); + throw new Exception(); + } + }); + amClient.rmClient = mockRM; + amClient.allocate(0.1f); + }catch (Exception ioe) {} + finally { + amClient.rmClient = realRM; + } + + assertTrue(amClient.release.size() == 2); + assertTrue(amClient.ask.size() == 3); + snoopRequest = amClient.ask.iterator().next(); + // verify that the remove request made in between makeRequest and allocate + // has not been lost + assertTrue(snoopRequest.getNumContainers() == 0); + + iterationsLeft = 2; + // do a few iterations to ensure RM is not going send new containers + while(!releases.isEmpty() || iterationsLeft-- > 0) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + AMResponse amResponse = allocResponse.getAMResponse(); + // RM did not send new containers because AM does not need any + assertTrue(amResponse.getAllocatedContainers().size() == 0); + if(amResponse.getCompletedContainersStatuses().size() > 0) { + for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) { + if(releases.contains(cStatus.getContainerId())) { + assertTrue(cStatus.getState() == ContainerState.COMPLETE); + assertTrue(cStatus.getExitStatus() == -100); + releases.remove(cStatus.getContainerId()); + } + } + } + if(iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(1000); + } + } + + assertTrue(amClient.ask.size() == 0); + assertTrue(amClient.release.size() == 0); + } + + private void sleep(int sleepTime) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +}