From 8af5c4b24aae932ecc8f09e72fdc7dc8307b6cc5 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 23 Aug 2011 01:32:32 +0000 Subject: [PATCH] MAPREDUCE-2649. Handling of finished applications in RM. Contributed by Thomas Graves. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1160521 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce/CHANGES.txt | 3 + .../org/apache/hadoop/mapred/YARNRunner.java | 4 +- .../src/main/resources/yarn-default.xml | 8 + .../resourcemanager/ClientRMService.java | 47 +- .../server/resourcemanager/RMAppManager.java | 171 +++++++ .../resourcemanager/RMAppManagerEvent.java | 18 + .../RMAppManagerEventType.java | 6 + .../RMAppManagerSubmitEvent.java | 18 + .../yarn/server/resourcemanager/RMConfig.java | 5 + .../resourcemanager/ResourceManager.java | 14 +- .../resourcemanager/rmapp/RMAppImpl.java | 7 + .../yarn/server/resourcemanager/MockRM.java | 10 +- .../resourcemanager/TestAppManager.java | 463 ++++++++++++++++++ .../resourcemanager/rmapp/MockRMApp.java | 106 ++++ .../rmapp/TestRMAppTransitions.java | 12 + 15 files changed, 844 insertions(+), 48 deletions(-) create mode 100644 hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java create mode 100644 hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java create mode 100644 hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java create mode 100644 hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java create mode 100644 hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java create mode 100644 hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java diff --git a/hadoop-mapreduce/CHANGES.txt b/hadoop-mapreduce/CHANGES.txt index fbf710bfb09..04c6d825bec 100644 --- a/hadoop-mapreduce/CHANGES.txt +++ b/hadoop-mapreduce/CHANGES.txt @@ -1127,6 +1127,9 @@ Trunk (unreleased changes) MAPREDUCE-2868. ant build broken in hadoop-mapreduce dir (mahadev, giri and arun via mahadev) + MAPREDUCE-2649. Handling of finished applications in RM. (Thomas Graves + via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index d8a7e4d60c2..6e4d1c56111 100644 --- a/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce/hadoop-mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -244,8 +244,8 @@ public class YARNRunner implements ClientProtocol { ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); - if (appMaster.getState() == ApplicationState.FAILED || appMaster.getState() == - ApplicationState.KILLED) { + if (appMaster == null || appMaster.getState() == ApplicationState.FAILED + || appMaster.getState() == ApplicationState.KILLED) { throw RPCUtil.getRemoteException("failed to run job"); } return clientServiceDelegate.getJobStatus(jobId); diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml index fa9a74a0a47..eded7652451 100644 --- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml @@ -45,6 +45,14 @@ /etc/krb5.keytab + + yarn.server.resourcemanager.expire.applications.completed.max + 10000 + the maximum number of completed applications the RM + keeps in memory + + + diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index df7a9e62072..83878c0cd78 100644 --- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -36,7 +36,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; @@ -70,10 +69,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo; -import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -97,8 +93,6 @@ public class ClientRMService extends AbstractService implements final private AtomicInteger applicationCounter = new AtomicInteger(0); final private YarnScheduler scheduler; final private RMContext rmContext; - private final ApplicationMasterService masterService; - private final ClientToAMSecretManager clientToAMSecretManager; private final AMLivelinessMonitor amLivelinessMonitor; private String clientServiceBindAddress; @@ -109,15 +103,11 @@ public class ClientRMService extends AbstractService implements private ApplicationACLsManager aclsManager; private Map applicationACLs; - public ClientRMService(RMContext rmContext, - ClientToAMSecretManager clientToAMSecretManager, - YarnScheduler scheduler, ApplicationMasterService masterService) { + public ClientRMService(RMContext rmContext, YarnScheduler scheduler) { super(ClientRMService.class.getName()); this.scheduler = scheduler; this.rmContext = rmContext; - this.masterService = masterService; this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); - this.clientToAMSecretManager = clientToAMSecretManager; } @Override @@ -206,42 +196,17 @@ public class ClientRMService extends AbstractService implements ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); try { - - ApplicationId applicationId = submissionContext.getApplicationId(); - String clientTokenStr = null; String user = UserGroupInformation.getCurrentUser().getShortUserName(); - if (UserGroupInformation.isSecurityEnabled()) { - Token clientToken = new Token( - new ApplicationTokenIdentifier(applicationId), - this.clientToAMSecretManager); - clientTokenStr = clientToken.encodeToUrlString(); - LOG.debug("Sending client token as " + clientTokenStr); - } - - submissionContext.setQueue(submissionContext.getQueue() == null - ? "default" : submissionContext.getQueue()); - submissionContext.setApplicationName(submissionContext - .getApplicationName() == null ? "N/A" : submissionContext - .getApplicationName()); - - ApplicationStore appStore = rmContext.getApplicationsStore() - .createApplicationStore(submissionContext.getApplicationId(), - submissionContext); - RMApp application = new RMAppImpl(applicationId, rmContext, - getConfig(), submissionContext.getApplicationName(), user, - submissionContext.getQueue(), submissionContext, clientTokenStr, - appStore, this.amLivelinessMonitor, this.scheduler, - this.masterService); - if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { + ApplicationId applicationId = submissionContext.getApplicationId(); + if (rmContext.getRMApps().get(applicationId) != null) { throw new IOException("Application with id " + applicationId + " is already present! Cannot add a duplicate!"); } - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.START)); + new RMAppManagerSubmitEvent(submissionContext)); - LOG.info("Application with id " + applicationId.getId() - + " submitted by user " + user + " with " + submissionContext); + LOG.info("Application with id " + applicationId.getId() + + " submitted by user " + user + " with " + submissionContext); } catch (IOException ie) { LOG.info("Exception in submitting application", ie); throw RPCUtil.getRemoteException(ie); diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java new file mode 100644 index 00000000000..3219d8220d6 --- /dev/null +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.io.IOException; +import java.util.List; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; +import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; + +/** + * This class manages the list of applications for the resource manager. + */ +public class RMAppManager implements EventHandler { + + private static final Log LOG = LogFactory.getLog(RMAppManager.class); + + private int completedAppsMax = RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX; + private LinkedList completedApps = new LinkedList(); + + private final RMContext rmContext; + private final ClientToAMSecretManager clientToAMSecretManager; + private final ApplicationMasterService masterService; + private final YarnScheduler scheduler; + private Configuration conf; + + public RMAppManager(RMContext context, ClientToAMSecretManager + clientToAMSecretManager, YarnScheduler scheduler, + ApplicationMasterService masterService, Configuration conf) { + this.rmContext = context; + this.scheduler = scheduler; + this.clientToAMSecretManager = clientToAMSecretManager; + this.masterService = masterService; + this.conf = conf; + setCompletedAppsMax(conf.getInt( + RMConfig.EXPIRE_APPLICATIONS_COMPLETED_MAX, + RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX)); + } + + protected void setCompletedAppsMax(int max) { + this.completedAppsMax = max; + } + + protected synchronized int getCompletedAppsListSize() { + return this.completedApps.size(); + } + + protected synchronized void addCompletedApp(ApplicationId appId) { + if (appId == null) { + LOG.error("RMAppManager received completed appId of null, skipping"); + } else { + completedApps.add(appId); + } + }; + + /* + * check to see if hit the limit for max # completed apps kept + */ + protected synchronized void checkAppNumCompletedLimit() { + while (completedApps.size() > this.completedAppsMax) { + ApplicationId removeId = completedApps.remove(); + LOG.info("Application should be expired, max # apps" + + " met. Removing app: " + removeId); + rmContext.getRMApps().remove(removeId); + } + } + + protected void submitApplication(ApplicationSubmissionContext submissionContext) { + ApplicationId applicationId = submissionContext.getApplicationId(); + RMApp application = null; + try { + String clientTokenStr = null; + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + if (UserGroupInformation.isSecurityEnabled()) { + Token clientToken = new + Token( + new ApplicationTokenIdentifier(applicationId), + this.clientToAMSecretManager); + clientTokenStr = clientToken.encodeToUrlString(); + LOG.debug("Sending client token as " + clientTokenStr); + } + submissionContext.setQueue(submissionContext.getQueue() == null + ? "default" : submissionContext.getQueue()); + submissionContext.setApplicationName(submissionContext + .getApplicationName() == null ? "N/A" : submissionContext + .getApplicationName()); + ApplicationStore appStore = rmContext.getApplicationsStore() + .createApplicationStore(submissionContext.getApplicationId(), + submissionContext); + application = new RMAppImpl(applicationId, rmContext, + this.conf, submissionContext.getApplicationName(), user, + submissionContext.getQueue(), submissionContext, clientTokenStr, + appStore, rmContext.getAMLivelinessMonitor(), this.scheduler, + this.masterService); + + if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { + LOG.info("Application with id " + applicationId + + " is already present! Cannot add a duplicate!"); + // don't send event through dispatcher as it will be handled by app already + // present with this id. + application.handle(new RMAppRejectedEvent(applicationId, + "Application with this id is already present! Cannot add a duplicate!")); + } else { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.START)); + } + } catch (IOException ie) { + LOG.info("RMAppManager submit application exception", ie); + if (application != null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(applicationId, ie.getMessage())); + } + } + } + + @Override + public void handle(RMAppManagerEvent event) { + ApplicationId appID = event.getApplicationId(); + LOG.debug("RMAppManager processing event for " + + appID + " of type " + event.getType()); + switch(event.getType()) { + case APP_COMPLETED: + { + addCompletedApp(appID); + checkAppNumCompletedLimit(); + } + break; + case APP_SUBMIT: + { + ApplicationSubmissionContext submissionContext = + ((RMAppManagerSubmitEvent)event).getSubmissionContext(); + submitApplication(submissionContext); + } + break; + default: + LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); + } + } +} diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java new file mode 100644 index 00000000000..7cddc174a37 --- /dev/null +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class RMAppManagerEvent extends AbstractEvent { + + private final ApplicationId appId; + + public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) { + super(type); + this.appId = appId; + } + + public ApplicationId getApplicationId() { + return this.appId; + } +} diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java new file mode 100644 index 00000000000..3b52ab14f32 --- /dev/null +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +public enum RMAppManagerEventType { + APP_SUBMIT, + APP_COMPLETED +} diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java new file mode 100644 index 00000000000..bc206a20086 --- /dev/null +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; + +public class RMAppManagerSubmitEvent extends RMAppManagerEvent { + + private final ApplicationSubmissionContext submissionContext; + + public RMAppManagerSubmitEvent(ApplicationSubmissionContext submissionContext) { + super(submissionContext.getApplicationId(), RMAppManagerEventType.APP_SUBMIT); + this.submissionContext = submissionContext; + } + + public ApplicationSubmissionContext getSubmissionContext() { + return this.submissionContext; + } +} diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java index bf0fbcfc1f8..c3fbf7610e4 100644 --- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java @@ -85,4 +85,9 @@ public class RMConfig { public static final String RM_NODES_EXCLUDE_FILE = YarnConfiguration.RM_PREFIX + "nodes.exclude"; public static final String DEFAULT_RM_NODES_EXCLUDE_FILE = ""; + + // the maximum number of completed applications RM keeps + public static String EXPIRE_APPLICATIONS_COMPLETED_MAX = + YarnConfiguration.RM_PREFIX + "expire.applications.completed.max"; + public static final int DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX = 10000; } diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 029e810f619..553b98b52a6 100644 --- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -98,6 +98,7 @@ public class ResourceManager extends CompositeService implements Recoverable { protected NMLivelinessMonitor nmLivelinessMonitor; protected NodesListManager nodesListManager; private SchedulerEventDispatcher schedulerDispatcher; + private RMAppManager rmAppManager; private final AtomicBoolean shutdown = new AtomicBoolean(false); private WebApp webApp; @@ -176,6 +177,11 @@ public class ResourceManager extends CompositeService implements Recoverable { clientRM = createClientRMService(); addService(clientRM); + + this.rmAppManager = createRMAppManager(); + // Register event handler for RMAppManagerEvents + this.rmDispatcher.register(RMAppManagerEventType.class, + this.rmAppManager); adminService = createAdminService(); addService(adminService); @@ -215,6 +221,11 @@ public class ResourceManager extends CompositeService implements Recoverable { return new AMLivelinessMonitor(this.rmDispatcher); } + protected RMAppManager createRMAppManager() { + return new RMAppManager(this.rmContext, this.clientToAMSecretManager, + this.scheduler, this.masterService, this.conf); + } + @Private public static final class SchedulerEventDispatcher extends AbstractService implements EventHandler { @@ -429,8 +440,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected ClientRMService createClientRMService() { - return new ClientRMService(this.rmContext, this.clientToAMSecretManager, - scheduler, masterService); + return new ClientRMService(this.rmContext, scheduler); } protected ApplicationMasterService createApplicationMasterService() { diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cdab96ba330..97f6e7f515d 100644 --- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMConfig; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -89,6 +91,8 @@ public class RMAppImpl implements RMApp { RMAppEventType.START, new StartAppAttemptTransition()) .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, new AppKilledTransition()) + .addTransition(RMAppState.NEW, RMAppState.FAILED, + RMAppEventType.APP_REJECTED, new AppRejectedTransition()) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.FAILED, @@ -429,6 +433,9 @@ public class RMAppImpl implements RMApp { new RMNodeCleanAppEvent(nodeId, app.applicationId)); } app.finishTime = System.currentTimeMillis(); + app.dispatcher.getEventHandler().handle( + new RMAppManagerEvent(app.applicationId, + RMAppManagerEventType.APP_COMPLETED)); }; } diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 03d88fd8512..aeb1f81b91c 100644 --- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -42,8 +42,13 @@ public class MockRM extends ResourceManager { public void waitForState(ApplicationId appId, RMAppState finalState) throws Exception { - RMApp app = getRMContext().getRMApps().get(appId); int timeoutSecs = 0; + RMApp app = null; + while ((app == null) && timeoutSecs++ < 20) { + app = getRMContext().getRMApps().get(appId); + Thread.sleep(500); + } + timeoutSecs = 0; while (!finalState.equals(app.getState()) && timeoutSecs++ < 20) { System.out.println("App State is : " + app.getState() + @@ -108,8 +113,7 @@ public class MockRM extends ResourceManager { @Override protected ClientRMService createClientRMService() { - return new ClientRMService(getRMContext(), - clientToAMSecretManager, getResourceScheduler(), masterService) { + return new ClientRMService(getRMContext(), getResourceScheduler()) { @Override public void start() { //override to not start rpc handler diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java new file mode 100644 index 00000000000..3109198d970 --- /dev/null +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager; +import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMConfig; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.service.Service; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import com.google.common.collect.Maps; +import com.google.common.collect.Lists; + +/** + * Testing applications being retired from RM. + * + */ + +public class TestAppManager{ + private static final Log LOG = LogFactory.getLog(TestAppManager.class); + private static RMAppEventType appEventType = RMAppEventType.KILL; + + public synchronized RMAppEventType getAppEventType() { + return appEventType; + } + public synchronized void setAppEventType(RMAppEventType newType) { + appEventType = newType; + } + + + public static List newRMApps(int n, long time, RMAppState state) { + List list = Lists.newArrayList(); + for (int i = 0; i < n; ++i) { + list.add(new MockRMApp(i, time, state)); + } + return list; + } + + public static RMContext mockRMContext(int n, long time) { + final List apps = newRMApps(n, time, RMAppState.FINISHED); + final ConcurrentMap map = Maps.newConcurrentMap(); + for (RMApp app : apps) { + map.put(app.getApplicationId(), app); + } + Dispatcher rmDispatcher = new AsyncDispatcher(); + ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer( + rmDispatcher); + AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor( + rmDispatcher); + return new RMContextImpl(new MemStore(), rmDispatcher, + containerAllocationExpirer, amLivelinessMonitor) { + @Override + public ConcurrentMap getRMApps() { + return map; + } + }; + } + + public class TestAppManagerDispatcher implements + EventHandler { + + private final RMContext rmContext; + + public TestAppManagerDispatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(RMAppManagerEvent event) { + // do nothing + } + } + + public class TestDispatcher implements + EventHandler { + + private final RMContext rmContext; + + public TestDispatcher(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(RMAppEvent event) { + ApplicationId appID = event.getApplicationId(); + //RMApp rmApp = this.rmContext.getRMApps().get(appID); + setAppEventType(event.getType()); + System.out.println("in handle routine " + getAppEventType().toString()); + } + } + + + // Extend and make the functions we want to test public + public class TestRMAppManager extends RMAppManager { + + public TestRMAppManager(RMContext context, Configuration conf) { + super(context, null, null, null, conf); + setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX); + } + + public TestRMAppManager(RMContext context, ClientToAMSecretManager + clientToAMSecretManager, YarnScheduler scheduler, + ApplicationMasterService masterService, Configuration conf) { + super(context, clientToAMSecretManager, scheduler, masterService, conf); + setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX); + } + + public void checkAppNumCompletedLimit() { + super.checkAppNumCompletedLimit(); + } + + public void addCompletedApp(ApplicationId appId) { + super.addCompletedApp(appId); + } + + public int getCompletedAppsListSize() { + return super.getCompletedAppsListSize(); + } + + public void setCompletedAppsMax(int max) { + super.setCompletedAppsMax(max); + } + public void submitApplication(ApplicationSubmissionContext submissionContext) { + super.submitApplication(submissionContext); + } + } + + protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) { + for (RMApp app : rmContext.getRMApps().values()) { + if (app.getState() == RMAppState.FINISHED + || app.getState() == RMAppState.KILLED + || app.getState() == RMAppState.FAILED) { + appMonitor.addCompletedApp(app.getApplicationId()); + } + } + } + + @Test + public void testRMAppRetireNone() throws Exception { + long now = System.currentTimeMillis(); + + // Create such that none of the applications will retire since + // haven't hit max # + RMContext rmContext = mockRMContext(10, now - 10); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); + + appMonitor.setCompletedAppsMax(10); + + Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit", + 10, rmContext.getRMApps().size()); + + // add them to completed apps list + addToCompletedApps(appMonitor, rmContext); + + // shouldn't have to many apps + appMonitor.checkAppNumCompletedLimit(); + Assert.assertEquals("Number of apps incorrect after # completed check", 10, + rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", 10, + appMonitor.getCompletedAppsListSize()); + } + + @Test + public void testRMAppRetireSome() throws Exception { + long now = System.currentTimeMillis(); + + RMContext rmContext = mockRMContext(10, now - 20000); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); + + appMonitor.setCompletedAppsMax(3); + + Assert.assertEquals("Number of apps incorrect before", 10, rmContext + .getRMApps().size()); + + // add them to completed apps list + addToCompletedApps(appMonitor, rmContext); + + // shouldn't have to many apps + appMonitor.checkAppNumCompletedLimit(); + Assert.assertEquals("Number of apps incorrect after # completed check", 3, + rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", 3, + appMonitor.getCompletedAppsListSize()); + } + + @Test + public void testRMAppRetireSomeDifferentStates() throws Exception { + long now = System.currentTimeMillis(); + + // these parameters don't matter, override applications below + RMContext rmContext = mockRMContext(10, now - 20000); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); + + appMonitor.setCompletedAppsMax(2); + + // clear out applications map + rmContext.getRMApps().clear(); + Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size()); + + // / set with various finished states + RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(1, now - 200000, RMAppState.FAILED); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(2, now - 30000, RMAppState.FINISHED); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(3, now - 20000, RMAppState.RUNNING); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(4, now - 20000, RMAppState.NEW); + rmContext.getRMApps().put(app.getApplicationId(), app); + + // make sure it doesn't expire these since still running + app = new MockRMApp(5, now - 10001, RMAppState.KILLED); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(6, now - 30000, RMAppState.ACCEPTED); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(7, now - 20000, RMAppState.SUBMITTED); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(8, now - 10001, RMAppState.FAILED); + rmContext.getRMApps().put(app.getApplicationId(), app); + app = new MockRMApp(9, now - 20000, RMAppState.FAILED); + rmContext.getRMApps().put(app.getApplicationId(), app); + + Assert.assertEquals("Number of apps incorrect before", 10, rmContext + .getRMApps().size()); + + // add them to completed apps list + addToCompletedApps(appMonitor, rmContext); + + // shouldn't have to many apps + appMonitor.checkAppNumCompletedLimit(); + Assert.assertEquals("Number of apps incorrect after # completed check", 6, + rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", 2, + appMonitor.getCompletedAppsListSize()); + + } + + @Test + public void testRMAppRetireNullApp() throws Exception { + long now = System.currentTimeMillis(); + + RMContext rmContext = mockRMContext(10, now - 20000); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); + + Assert.assertEquals("Number of apps incorrect before", 10, rmContext + .getRMApps().size()); + + appMonitor.addCompletedApp(null); + + Assert.assertEquals("Number of completed apps incorrect after check", 0, + appMonitor.getCompletedAppsListSize()); + } + + @Test + public void testRMAppRetireZeroSetting() throws Exception { + long now = System.currentTimeMillis(); + + RMContext rmContext = mockRMContext(10, now - 20000); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); + + Assert.assertEquals("Number of apps incorrect before", 10, rmContext + .getRMApps().size()); + + // test with 0 + appMonitor.setCompletedAppsMax(0); + + addToCompletedApps(appMonitor, rmContext); + Assert.assertEquals("Number of completed apps incorrect", 10, + appMonitor.getCompletedAppsListSize()); + + appMonitor.checkAppNumCompletedLimit(); + + Assert.assertEquals("Number of apps incorrect after # completed check", 0, + rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", 0, + appMonitor.getCompletedAppsListSize()); + } + + protected void setupDispatcher(RMContext rmContext, Configuration conf) { + TestDispatcher testDispatcher = new TestDispatcher(rmContext); + TestAppManagerDispatcher testAppManagerDispatcher = new TestAppManagerDispatcher(rmContext); + rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher); + rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher); + ((Service)rmContext.getDispatcher()).init(conf); + ((Service)rmContext.getDispatcher()).start(); + Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType); + } + + @Test + public void testRMAppSubmit() throws Exception { + long now = System.currentTimeMillis(); + + RMContext rmContext = mockRMContext(0, now - 10); + ResourceScheduler scheduler = new CapacityScheduler(); + ApplicationMasterService masterService = new ApplicationMasterService(rmContext, + new ApplicationTokenSecretManager(), scheduler); + Configuration conf = new Configuration(); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, + new ClientToAMSecretManager(), scheduler, masterService, conf); + + ApplicationId appID = MockApps.newAppID(1); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + context.setApplicationId(appID); + setupDispatcher(rmContext, conf); + + appMonitor.submitApplication(context); + RMApp app = rmContext.getRMApps().get(appID); + Assert.assertNotNull("app is null", app); + Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); + Assert.assertEquals("app name doesn't match", "N/A", app.getName()); + Assert.assertEquals("app queue doesn't match", "default", app.getQueue()); + Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState()); + Assert.assertNotNull("app store is null", app.getApplicationStore()); + + // wait for event to be processed + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType()); + setAppEventType(RMAppEventType.KILL); + ((Service)rmContext.getDispatcher()).stop(); + } + + @Test + public void testRMAppSubmitWithQueueAndName() throws Exception { + long now = System.currentTimeMillis(); + + RMContext rmContext = mockRMContext(1, now - 10); + ResourceScheduler scheduler = new CapacityScheduler(); + ApplicationMasterService masterService = new ApplicationMasterService(rmContext, + new ApplicationTokenSecretManager(), scheduler); + Configuration conf = new Configuration(); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, + new ClientToAMSecretManager(), scheduler, masterService, conf); + + ApplicationId appID = MockApps.newAppID(10); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + context.setApplicationId(appID); + context.setApplicationName("testApp1"); + context.setQueue("testQueue"); + + setupDispatcher(rmContext, conf); + + appMonitor.submitApplication(context); + RMApp app = rmContext.getRMApps().get(appID); + Assert.assertNotNull("app is null", app); + Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); + Assert.assertEquals("app name doesn't match", "testApp1", app.getName()); + Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue()); + Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState()); + Assert.assertNotNull("app store is null", app.getApplicationStore()); + + // wait for event to be processed + int timeoutSecs = 0; + while ((getAppEventType() == RMAppEventType.KILL) && + timeoutSecs++ < 20) { + Thread.sleep(1000); + } + Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType()); + setAppEventType(RMAppEventType.KILL); + ((Service)rmContext.getDispatcher()).stop(); + } + + @Test + public void testRMAppSubmitError() throws Exception { + long now = System.currentTimeMillis(); + + // specify 1 here and use same appId below so it gets duplicate entry + RMContext rmContext = mockRMContext(1, now - 10); + ResourceScheduler scheduler = new CapacityScheduler(); + ApplicationMasterService masterService = new ApplicationMasterService(rmContext, + new ApplicationTokenSecretManager(), scheduler); + Configuration conf = new Configuration(); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, + new ClientToAMSecretManager(), scheduler, masterService, conf); + + ApplicationId appID = MockApps.newAppID(0); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + context.setApplicationId(appID); + context.setApplicationName("testApp1"); + context.setQueue("testQueue"); + + setupDispatcher(rmContext, conf); + + RMApp appOrig = rmContext.getRMApps().get(appID); + Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName()); + + // our testApp1 should be rejected and original app with same id should be left in place + appMonitor.submitApplication(context); + + // make sure original app didn't get removed + RMApp app = rmContext.getRMApps().get(appID); + Assert.assertNotNull("app is null", app); + Assert.assertEquals("app id doesn't match", appID, app.getApplicationId()); + Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName()); + ((Service)rmContext.getDispatcher()).stop(); + } + +} diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java new file mode 100644 index 00000000000..0ef02381afa --- /dev/null +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; + +public class MockRMApp implements RMApp { + static final int DT = 1000000; // ms + + String user = MockApps.newUserName(); + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + long start = System.currentTimeMillis() - (int) (Math.random() * DT); + long finish = 0; + RMAppState state = RMAppState.NEW; + int failCount = 0; + ApplicationId id; + + public MockRMApp(int newid, long time, RMAppState newState) { + finish = time; + id = MockApps.newAppID(newid); + state = newState; + } + + public MockRMApp(int newid, long time, RMAppState newState, String userName) { + this(newid, time, newState); + user = userName; + } + + @Override + public ApplicationId getApplicationId() { + return id; + } + + @Override + public RMAppState getState() { + return state; + } + + @Override + public String getUser() { + return user; + } + + @Override + public float getProgress() { + return (float) 0.0; + } + + @Override + public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public String getQueue() { + return queue; + } + + @Override + public String getName() { + return name; + } + + @Override + public RMAppAttempt getCurrentAppAttempt() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public ApplicationReport createAndGetApplicationReport() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public ApplicationStore getApplicationStore() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public long getFinishTime() { + return finish; + } + + @Override + public long getStartTime() { + return start; + } + + @Override + public String getTrackingUrl() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public StringBuilder getDiagnostics() { + throw new UnsupportedOperationException("Not supported yet."); + } + + public void handle(RMAppEvent event) { + }; + +} diff --git a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index a489ced3d4f..0614f8107b2 100644 --- a/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-mapreduce/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -262,6 +262,18 @@ public class TestRMAppTransitions { assertKilled(application); } + @Test + public void testAppNewReject() throws IOException { + LOG.info("--- START: testAppNewReject ---"); + + RMApp application = createNewTestApp(); + // NEW => FAILED event RMAppEventType.APP_REJECTED + String rejectedText = "Test Application Rejected"; + RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText); + application.handle(event); + assertFailed(application, rejectedText); + } + @Test public void testAppSubmittedRejected() throws IOException { LOG.info("--- START: testAppSubmittedRejected ---");