MAPREDUCE-2649. Handling of finished applications in RM. Contributed by Thomas Graves.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1160521 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2892f6d817
commit
8af5c4b24a
|
@ -1127,6 +1127,9 @@ Trunk (unreleased changes)
|
||||||
|
|
||||||
MAPREDUCE-2868. ant build broken in hadoop-mapreduce dir (mahadev, giri and arun via mahadev)
|
MAPREDUCE-2868. ant build broken in hadoop-mapreduce dir (mahadev, giri and arun via mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-2649. Handling of finished applications in RM. (Thomas Graves
|
||||||
|
via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -244,8 +244,8 @@ public class YARNRunner implements ClientProtocol {
|
||||||
|
|
||||||
ApplicationReport appMaster = resMgrDelegate
|
ApplicationReport appMaster = resMgrDelegate
|
||||||
.getApplicationReport(applicationId);
|
.getApplicationReport(applicationId);
|
||||||
if (appMaster.getState() == ApplicationState.FAILED || appMaster.getState() ==
|
if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|
||||||
ApplicationState.KILLED) {
|
|| appMaster.getState() == ApplicationState.KILLED) {
|
||||||
throw RPCUtil.getRemoteException("failed to run job");
|
throw RPCUtil.getRemoteException("failed to run job");
|
||||||
}
|
}
|
||||||
return clientServiceDelegate.getJobStatus(jobId);
|
return clientServiceDelegate.getJobStatus(jobId);
|
||||||
|
|
|
@ -45,6 +45,14 @@
|
||||||
<value>/etc/krb5.keytab</value>
|
<value>/etc/krb5.keytab</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.server.resourcemanager.expire.applications.completed.max</name>
|
||||||
|
<value>10000</value>
|
||||||
|
<description>the maximum number of completed applications the RM
|
||||||
|
keeps in memory
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!-- All nodemanager related configuration properties -->
|
<!-- All nodemanager related configuration properties -->
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityInfo;
|
import org.apache.hadoop.security.SecurityInfo;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
|
||||||
|
@ -70,10 +69,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
|
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
@ -97,8 +93,6 @@ public class ClientRMService extends AbstractService implements
|
||||||
final private AtomicInteger applicationCounter = new AtomicInteger(0);
|
final private AtomicInteger applicationCounter = new AtomicInteger(0);
|
||||||
final private YarnScheduler scheduler;
|
final private YarnScheduler scheduler;
|
||||||
final private RMContext rmContext;
|
final private RMContext rmContext;
|
||||||
private final ApplicationMasterService masterService;
|
|
||||||
private final ClientToAMSecretManager clientToAMSecretManager;
|
|
||||||
private final AMLivelinessMonitor amLivelinessMonitor;
|
private final AMLivelinessMonitor amLivelinessMonitor;
|
||||||
|
|
||||||
private String clientServiceBindAddress;
|
private String clientServiceBindAddress;
|
||||||
|
@ -109,15 +103,11 @@ public class ClientRMService extends AbstractService implements
|
||||||
private ApplicationACLsManager aclsManager;
|
private ApplicationACLsManager aclsManager;
|
||||||
private Map<ApplicationACL, AccessControlList> applicationACLs;
|
private Map<ApplicationACL, AccessControlList> applicationACLs;
|
||||||
|
|
||||||
public ClientRMService(RMContext rmContext,
|
public ClientRMService(RMContext rmContext, YarnScheduler scheduler) {
|
||||||
ClientToAMSecretManager clientToAMSecretManager,
|
|
||||||
YarnScheduler scheduler, ApplicationMasterService masterService) {
|
|
||||||
super(ClientRMService.class.getName());
|
super(ClientRMService.class.getName());
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
this.masterService = masterService;
|
|
||||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||||
this.clientToAMSecretManager = clientToAMSecretManager;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -206,42 +196,17 @@ public class ClientRMService extends AbstractService implements
|
||||||
ApplicationSubmissionContext submissionContext = request
|
ApplicationSubmissionContext submissionContext = request
|
||||||
.getApplicationSubmissionContext();
|
.getApplicationSubmissionContext();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
|
||||||
String clientTokenStr = null;
|
|
||||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
Token<ApplicationTokenIdentifier> clientToken = new Token<ApplicationTokenIdentifier>(
|
if (rmContext.getRMApps().get(applicationId) != null) {
|
||||||
new ApplicationTokenIdentifier(applicationId),
|
|
||||||
this.clientToAMSecretManager);
|
|
||||||
clientTokenStr = clientToken.encodeToUrlString();
|
|
||||||
LOG.debug("Sending client token as " + clientTokenStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
submissionContext.setQueue(submissionContext.getQueue() == null
|
|
||||||
? "default" : submissionContext.getQueue());
|
|
||||||
submissionContext.setApplicationName(submissionContext
|
|
||||||
.getApplicationName() == null ? "N/A" : submissionContext
|
|
||||||
.getApplicationName());
|
|
||||||
|
|
||||||
ApplicationStore appStore = rmContext.getApplicationsStore()
|
|
||||||
.createApplicationStore(submissionContext.getApplicationId(),
|
|
||||||
submissionContext);
|
|
||||||
RMApp application = new RMAppImpl(applicationId, rmContext,
|
|
||||||
getConfig(), submissionContext.getApplicationName(), user,
|
|
||||||
submissionContext.getQueue(), submissionContext, clientTokenStr,
|
|
||||||
appStore, this.amLivelinessMonitor, this.scheduler,
|
|
||||||
this.masterService);
|
|
||||||
if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
|
|
||||||
throw new IOException("Application with id " + applicationId
|
throw new IOException("Application with id " + applicationId
|
||||||
+ " is already present! Cannot add a duplicate!");
|
+ " is already present! Cannot add a duplicate!");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMAppEvent(applicationId, RMAppEventType.START));
|
new RMAppManagerSubmitEvent(submissionContext));
|
||||||
|
|
||||||
LOG.info("Application with id " + applicationId.getId()
|
LOG.info("Application with id " + applicationId.getId() +
|
||||||
+ " submitted by user " + user + " with " + submissionContext);
|
" submitted by user " + user + " with " + submissionContext);
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.info("Exception in submitting application", ie);
|
LOG.info("Exception in submitting application", ie);
|
||||||
throw RPCUtil.getRemoteException(ie);
|
throw RPCUtil.getRemoteException(ie);
|
||||||
|
|
|
@ -0,0 +1,171 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class manages the list of applications for the resource manager.
|
||||||
|
*/
|
||||||
|
public class RMAppManager implements EventHandler<RMAppManagerEvent> {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
|
||||||
|
|
||||||
|
private int completedAppsMax = RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX;
|
||||||
|
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
|
||||||
|
|
||||||
|
private final RMContext rmContext;
|
||||||
|
private final ClientToAMSecretManager clientToAMSecretManager;
|
||||||
|
private final ApplicationMasterService masterService;
|
||||||
|
private final YarnScheduler scheduler;
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
public RMAppManager(RMContext context, ClientToAMSecretManager
|
||||||
|
clientToAMSecretManager, YarnScheduler scheduler,
|
||||||
|
ApplicationMasterService masterService, Configuration conf) {
|
||||||
|
this.rmContext = context;
|
||||||
|
this.scheduler = scheduler;
|
||||||
|
this.clientToAMSecretManager = clientToAMSecretManager;
|
||||||
|
this.masterService = masterService;
|
||||||
|
this.conf = conf;
|
||||||
|
setCompletedAppsMax(conf.getInt(
|
||||||
|
RMConfig.EXPIRE_APPLICATIONS_COMPLETED_MAX,
|
||||||
|
RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setCompletedAppsMax(int max) {
|
||||||
|
this.completedAppsMax = max;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized int getCompletedAppsListSize() {
|
||||||
|
return this.completedApps.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized void addCompletedApp(ApplicationId appId) {
|
||||||
|
if (appId == null) {
|
||||||
|
LOG.error("RMAppManager received completed appId of null, skipping");
|
||||||
|
} else {
|
||||||
|
completedApps.add(appId);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* check to see if hit the limit for max # completed apps kept
|
||||||
|
*/
|
||||||
|
protected synchronized void checkAppNumCompletedLimit() {
|
||||||
|
while (completedApps.size() > this.completedAppsMax) {
|
||||||
|
ApplicationId removeId = completedApps.remove();
|
||||||
|
LOG.info("Application should be expired, max # apps"
|
||||||
|
+ " met. Removing app: " + removeId);
|
||||||
|
rmContext.getRMApps().remove(removeId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void submitApplication(ApplicationSubmissionContext submissionContext) {
|
||||||
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
|
RMApp application = null;
|
||||||
|
try {
|
||||||
|
String clientTokenStr = null;
|
||||||
|
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
Token<ApplicationTokenIdentifier> clientToken = new
|
||||||
|
Token<ApplicationTokenIdentifier>(
|
||||||
|
new ApplicationTokenIdentifier(applicationId),
|
||||||
|
this.clientToAMSecretManager);
|
||||||
|
clientTokenStr = clientToken.encodeToUrlString();
|
||||||
|
LOG.debug("Sending client token as " + clientTokenStr);
|
||||||
|
}
|
||||||
|
submissionContext.setQueue(submissionContext.getQueue() == null
|
||||||
|
? "default" : submissionContext.getQueue());
|
||||||
|
submissionContext.setApplicationName(submissionContext
|
||||||
|
.getApplicationName() == null ? "N/A" : submissionContext
|
||||||
|
.getApplicationName());
|
||||||
|
ApplicationStore appStore = rmContext.getApplicationsStore()
|
||||||
|
.createApplicationStore(submissionContext.getApplicationId(),
|
||||||
|
submissionContext);
|
||||||
|
application = new RMAppImpl(applicationId, rmContext,
|
||||||
|
this.conf, submissionContext.getApplicationName(), user,
|
||||||
|
submissionContext.getQueue(), submissionContext, clientTokenStr,
|
||||||
|
appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
|
||||||
|
this.masterService);
|
||||||
|
|
||||||
|
if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
|
||||||
|
LOG.info("Application with id " + applicationId +
|
||||||
|
" is already present! Cannot add a duplicate!");
|
||||||
|
// don't send event through dispatcher as it will be handled by app already
|
||||||
|
// present with this id.
|
||||||
|
application.handle(new RMAppRejectedEvent(applicationId,
|
||||||
|
"Application with this id is already present! Cannot add a duplicate!"));
|
||||||
|
} else {
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMAppEvent(applicationId, RMAppEventType.START));
|
||||||
|
}
|
||||||
|
} catch (IOException ie) {
|
||||||
|
LOG.info("RMAppManager submit application exception", ie);
|
||||||
|
if (application != null) {
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMAppRejectedEvent(applicationId, ie.getMessage()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(RMAppManagerEvent event) {
|
||||||
|
ApplicationId appID = event.getApplicationId();
|
||||||
|
LOG.debug("RMAppManager processing event for "
|
||||||
|
+ appID + " of type " + event.getType());
|
||||||
|
switch(event.getType()) {
|
||||||
|
case APP_COMPLETED:
|
||||||
|
{
|
||||||
|
addCompletedApp(appID);
|
||||||
|
checkAppNumCompletedLimit();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case APP_SUBMIT:
|
||||||
|
{
|
||||||
|
ApplicationSubmissionContext submissionContext =
|
||||||
|
((RMAppManagerSubmitEvent)event).getSubmissionContext();
|
||||||
|
submitApplication(submissionContext);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||||
|
|
||||||
|
public class RMAppManagerEvent extends AbstractEvent<RMAppManagerEventType> {
|
||||||
|
|
||||||
|
private final ApplicationId appId;
|
||||||
|
|
||||||
|
public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) {
|
||||||
|
super(type);
|
||||||
|
this.appId = appId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationId getApplicationId() {
|
||||||
|
return this.appId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
public enum RMAppManagerEventType {
|
||||||
|
APP_SUBMIT,
|
||||||
|
APP_COMPLETED
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
|
||||||
|
public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
|
||||||
|
|
||||||
|
private final ApplicationSubmissionContext submissionContext;
|
||||||
|
|
||||||
|
public RMAppManagerSubmitEvent(ApplicationSubmissionContext submissionContext) {
|
||||||
|
super(submissionContext.getApplicationId(), RMAppManagerEventType.APP_SUBMIT);
|
||||||
|
this.submissionContext = submissionContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ApplicationSubmissionContext getSubmissionContext() {
|
||||||
|
return this.submissionContext;
|
||||||
|
}
|
||||||
|
}
|
|
@ -85,4 +85,9 @@ public class RMConfig {
|
||||||
public static final String RM_NODES_EXCLUDE_FILE =
|
public static final String RM_NODES_EXCLUDE_FILE =
|
||||||
YarnConfiguration.RM_PREFIX + "nodes.exclude";
|
YarnConfiguration.RM_PREFIX + "nodes.exclude";
|
||||||
public static final String DEFAULT_RM_NODES_EXCLUDE_FILE = "";
|
public static final String DEFAULT_RM_NODES_EXCLUDE_FILE = "";
|
||||||
|
|
||||||
|
// the maximum number of completed applications RM keeps
|
||||||
|
public static String EXPIRE_APPLICATIONS_COMPLETED_MAX =
|
||||||
|
YarnConfiguration.RM_PREFIX + "expire.applications.completed.max";
|
||||||
|
public static final int DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX = 10000;
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
protected NMLivelinessMonitor nmLivelinessMonitor;
|
protected NMLivelinessMonitor nmLivelinessMonitor;
|
||||||
protected NodesListManager nodesListManager;
|
protected NodesListManager nodesListManager;
|
||||||
private SchedulerEventDispatcher schedulerDispatcher;
|
private SchedulerEventDispatcher schedulerDispatcher;
|
||||||
|
private RMAppManager rmAppManager;
|
||||||
|
|
||||||
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||||
private WebApp webApp;
|
private WebApp webApp;
|
||||||
|
@ -176,6 +177,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
|
|
||||||
clientRM = createClientRMService();
|
clientRM = createClientRMService();
|
||||||
addService(clientRM);
|
addService(clientRM);
|
||||||
|
|
||||||
|
this.rmAppManager = createRMAppManager();
|
||||||
|
// Register event handler for RMAppManagerEvents
|
||||||
|
this.rmDispatcher.register(RMAppManagerEventType.class,
|
||||||
|
this.rmAppManager);
|
||||||
|
|
||||||
adminService = createAdminService();
|
adminService = createAdminService();
|
||||||
addService(adminService);
|
addService(adminService);
|
||||||
|
@ -215,6 +221,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
return new AMLivelinessMonitor(this.rmDispatcher);
|
return new AMLivelinessMonitor(this.rmDispatcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected RMAppManager createRMAppManager() {
|
||||||
|
return new RMAppManager(this.rmContext, this.clientToAMSecretManager,
|
||||||
|
this.scheduler, this.masterService, this.conf);
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final class SchedulerEventDispatcher extends AbstractService
|
public static final class SchedulerEventDispatcher extends AbstractService
|
||||||
implements EventHandler<SchedulerEvent> {
|
implements EventHandler<SchedulerEvent> {
|
||||||
|
@ -429,8 +440,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(this.rmContext, this.clientToAMSecretManager,
|
return new ClientRMService(this.rmContext, scheduler);
|
||||||
scheduler, masterService);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ApplicationMasterService createApplicationMasterService() {
|
protected ApplicationMasterService createApplicationMasterService() {
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
|
@ -89,6 +91,8 @@ public class RMAppImpl implements RMApp {
|
||||||
RMAppEventType.START, new StartAppAttemptTransition())
|
RMAppEventType.START, new StartAppAttemptTransition())
|
||||||
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
|
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
|
||||||
new AppKilledTransition())
|
new AppKilledTransition())
|
||||||
|
.addTransition(RMAppState.NEW, RMAppState.FAILED,
|
||||||
|
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
|
||||||
|
|
||||||
// Transitions from SUBMITTED state
|
// Transitions from SUBMITTED state
|
||||||
.addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
|
.addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
|
||||||
|
@ -429,6 +433,9 @@ public class RMAppImpl implements RMApp {
|
||||||
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
||||||
}
|
}
|
||||||
app.finishTime = System.currentTimeMillis();
|
app.finishTime = System.currentTimeMillis();
|
||||||
|
app.dispatcher.getEventHandler().handle(
|
||||||
|
new RMAppManagerEvent(app.applicationId,
|
||||||
|
RMAppManagerEventType.APP_COMPLETED));
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,8 +42,13 @@ public class MockRM extends ResourceManager {
|
||||||
|
|
||||||
public void waitForState(ApplicationId appId, RMAppState finalState)
|
public void waitForState(ApplicationId appId, RMAppState finalState)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
RMApp app = getRMContext().getRMApps().get(appId);
|
|
||||||
int timeoutSecs = 0;
|
int timeoutSecs = 0;
|
||||||
|
RMApp app = null;
|
||||||
|
while ((app == null) && timeoutSecs++ < 20) {
|
||||||
|
app = getRMContext().getRMApps().get(appId);
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
timeoutSecs = 0;
|
||||||
while (!finalState.equals(app.getState()) &&
|
while (!finalState.equals(app.getState()) &&
|
||||||
timeoutSecs++ < 20) {
|
timeoutSecs++ < 20) {
|
||||||
System.out.println("App State is : " + app.getState() +
|
System.out.println("App State is : " + app.getState() +
|
||||||
|
@ -108,8 +113,7 @@ public class MockRM extends ResourceManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ClientRMService createClientRMService() {
|
protected ClientRMService createClientRMService() {
|
||||||
return new ClientRMService(getRMContext(),
|
return new ClientRMService(getRMContext(), getResourceScheduler()) {
|
||||||
clientToAMSecretManager, getResourceScheduler(), masterService) {
|
|
||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
//override to not start rpc handler
|
//override to not start rpc handler
|
||||||
|
|
|
@ -0,0 +1,463 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.MockApps;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testing applications being retired from RM.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class TestAppManager{
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestAppManager.class);
|
||||||
|
private static RMAppEventType appEventType = RMAppEventType.KILL;
|
||||||
|
|
||||||
|
public synchronized RMAppEventType getAppEventType() {
|
||||||
|
return appEventType;
|
||||||
|
}
|
||||||
|
public synchronized void setAppEventType(RMAppEventType newType) {
|
||||||
|
appEventType = newType;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static List<RMApp> newRMApps(int n, long time, RMAppState state) {
|
||||||
|
List<RMApp> list = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < n; ++i) {
|
||||||
|
list.add(new MockRMApp(i, time, state));
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RMContext mockRMContext(int n, long time) {
|
||||||
|
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
|
||||||
|
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
|
||||||
|
for (RMApp app : apps) {
|
||||||
|
map.put(app.getApplicationId(), app);
|
||||||
|
}
|
||||||
|
Dispatcher rmDispatcher = new AsyncDispatcher();
|
||||||
|
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
|
||||||
|
rmDispatcher);
|
||||||
|
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
|
||||||
|
rmDispatcher);
|
||||||
|
return new RMContextImpl(new MemStore(), rmDispatcher,
|
||||||
|
containerAllocationExpirer, amLivelinessMonitor) {
|
||||||
|
@Override
|
||||||
|
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public class TestAppManagerDispatcher implements
|
||||||
|
EventHandler<RMAppManagerEvent> {
|
||||||
|
|
||||||
|
private final RMContext rmContext;
|
||||||
|
|
||||||
|
public TestAppManagerDispatcher(RMContext rmContext) {
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(RMAppManagerEvent event) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class TestDispatcher implements
|
||||||
|
EventHandler<RMAppEvent> {
|
||||||
|
|
||||||
|
private final RMContext rmContext;
|
||||||
|
|
||||||
|
public TestDispatcher(RMContext rmContext) {
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(RMAppEvent event) {
|
||||||
|
ApplicationId appID = event.getApplicationId();
|
||||||
|
//RMApp rmApp = this.rmContext.getRMApps().get(appID);
|
||||||
|
setAppEventType(event.getType());
|
||||||
|
System.out.println("in handle routine " + getAppEventType().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Extend and make the functions we want to test public
|
||||||
|
public class TestRMAppManager extends RMAppManager {
|
||||||
|
|
||||||
|
public TestRMAppManager(RMContext context, Configuration conf) {
|
||||||
|
super(context, null, null, null, conf);
|
||||||
|
setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestRMAppManager(RMContext context, ClientToAMSecretManager
|
||||||
|
clientToAMSecretManager, YarnScheduler scheduler,
|
||||||
|
ApplicationMasterService masterService, Configuration conf) {
|
||||||
|
super(context, clientToAMSecretManager, scheduler, masterService, conf);
|
||||||
|
setCompletedAppsMax(RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkAppNumCompletedLimit() {
|
||||||
|
super.checkAppNumCompletedLimit();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addCompletedApp(ApplicationId appId) {
|
||||||
|
super.addCompletedApp(appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCompletedAppsListSize() {
|
||||||
|
return super.getCompletedAppsListSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCompletedAppsMax(int max) {
|
||||||
|
super.setCompletedAppsMax(max);
|
||||||
|
}
|
||||||
|
public void submitApplication(ApplicationSubmissionContext submissionContext) {
|
||||||
|
super.submitApplication(submissionContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
|
||||||
|
for (RMApp app : rmContext.getRMApps().values()) {
|
||||||
|
if (app.getState() == RMAppState.FINISHED
|
||||||
|
|| app.getState() == RMAppState.KILLED
|
||||||
|
|| app.getState() == RMAppState.FAILED) {
|
||||||
|
appMonitor.addCompletedApp(app.getApplicationId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppRetireNone() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// Create such that none of the applications will retire since
|
||||||
|
// haven't hit max #
|
||||||
|
RMContext rmContext = mockRMContext(10, now - 10);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
||||||
|
|
||||||
|
appMonitor.setCompletedAppsMax(10);
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
|
||||||
|
10, rmContext.getRMApps().size());
|
||||||
|
|
||||||
|
// add them to completed apps list
|
||||||
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
|
|
||||||
|
// shouldn't have to many apps
|
||||||
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
Assert.assertEquals("Number of apps incorrect after # completed check", 10,
|
||||||
|
rmContext.getRMApps().size());
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect after check", 10,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppRetireSome() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
||||||
|
|
||||||
|
appMonitor.setCompletedAppsMax(3);
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
||||||
|
.getRMApps().size());
|
||||||
|
|
||||||
|
// add them to completed apps list
|
||||||
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
|
|
||||||
|
// shouldn't have to many apps
|
||||||
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
Assert.assertEquals("Number of apps incorrect after # completed check", 3,
|
||||||
|
rmContext.getRMApps().size());
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect after check", 3,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppRetireSomeDifferentStates() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// these parameters don't matter, override applications below
|
||||||
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
||||||
|
|
||||||
|
appMonitor.setCompletedAppsMax(2);
|
||||||
|
|
||||||
|
// clear out applications map
|
||||||
|
rmContext.getRMApps().clear();
|
||||||
|
Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
|
||||||
|
|
||||||
|
// / set with various finished states
|
||||||
|
RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(1, now - 200000, RMAppState.FAILED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(2, now - 30000, RMAppState.FINISHED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(3, now - 20000, RMAppState.RUNNING);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(4, now - 20000, RMAppState.NEW);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
|
||||||
|
// make sure it doesn't expire these since still running
|
||||||
|
app = new MockRMApp(5, now - 10001, RMAppState.KILLED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(6, now - 30000, RMAppState.ACCEPTED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(7, now - 20000, RMAppState.SUBMITTED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(8, now - 10001, RMAppState.FAILED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
app = new MockRMApp(9, now - 20000, RMAppState.FAILED);
|
||||||
|
rmContext.getRMApps().put(app.getApplicationId(), app);
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
||||||
|
.getRMApps().size());
|
||||||
|
|
||||||
|
// add them to completed apps list
|
||||||
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
|
|
||||||
|
// shouldn't have to many apps
|
||||||
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
Assert.assertEquals("Number of apps incorrect after # completed check", 6,
|
||||||
|
rmContext.getRMApps().size());
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect after check", 2,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppRetireNullApp() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
||||||
|
.getRMApps().size());
|
||||||
|
|
||||||
|
appMonitor.addCompletedApp(null);
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect after check", 0,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppRetireZeroSetting() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
RMContext rmContext = mockRMContext(10, now - 20000);
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
|
||||||
|
.getRMApps().size());
|
||||||
|
|
||||||
|
// test with 0
|
||||||
|
appMonitor.setCompletedAppsMax(0);
|
||||||
|
|
||||||
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect", 10,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
|
||||||
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
|
||||||
|
Assert.assertEquals("Number of apps incorrect after # completed check", 0,
|
||||||
|
rmContext.getRMApps().size());
|
||||||
|
Assert.assertEquals("Number of completed apps incorrect after check", 0,
|
||||||
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
|
||||||
|
TestDispatcher testDispatcher = new TestDispatcher(rmContext);
|
||||||
|
TestAppManagerDispatcher testAppManagerDispatcher = new TestAppManagerDispatcher(rmContext);
|
||||||
|
rmContext.getDispatcher().register(RMAppEventType.class, testDispatcher);
|
||||||
|
rmContext.getDispatcher().register(RMAppManagerEventType.class, testAppManagerDispatcher);
|
||||||
|
((Service)rmContext.getDispatcher()).init(conf);
|
||||||
|
((Service)rmContext.getDispatcher()).start();
|
||||||
|
Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppSubmit() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
RMContext rmContext = mockRMContext(0, now - 10);
|
||||||
|
ResourceScheduler scheduler = new CapacityScheduler();
|
||||||
|
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||||
|
new ApplicationTokenSecretManager(), scheduler);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||||
|
new ClientToAMSecretManager(), scheduler, masterService, conf);
|
||||||
|
|
||||||
|
ApplicationId appID = MockApps.newAppID(1);
|
||||||
|
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||||
|
context.setApplicationId(appID);
|
||||||
|
setupDispatcher(rmContext, conf);
|
||||||
|
|
||||||
|
appMonitor.submitApplication(context);
|
||||||
|
RMApp app = rmContext.getRMApps().get(appID);
|
||||||
|
Assert.assertNotNull("app is null", app);
|
||||||
|
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
|
||||||
|
Assert.assertEquals("app name doesn't match", "N/A", app.getName());
|
||||||
|
Assert.assertEquals("app queue doesn't match", "default", app.getQueue());
|
||||||
|
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
|
||||||
|
Assert.assertNotNull("app store is null", app.getApplicationStore());
|
||||||
|
|
||||||
|
// wait for event to be processed
|
||||||
|
int timeoutSecs = 0;
|
||||||
|
while ((getAppEventType() == RMAppEventType.KILL) &&
|
||||||
|
timeoutSecs++ < 20) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
|
||||||
|
setAppEventType(RMAppEventType.KILL);
|
||||||
|
((Service)rmContext.getDispatcher()).stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppSubmitWithQueueAndName() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
RMContext rmContext = mockRMContext(1, now - 10);
|
||||||
|
ResourceScheduler scheduler = new CapacityScheduler();
|
||||||
|
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||||
|
new ApplicationTokenSecretManager(), scheduler);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||||
|
new ClientToAMSecretManager(), scheduler, masterService, conf);
|
||||||
|
|
||||||
|
ApplicationId appID = MockApps.newAppID(10);
|
||||||
|
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||||
|
context.setApplicationId(appID);
|
||||||
|
context.setApplicationName("testApp1");
|
||||||
|
context.setQueue("testQueue");
|
||||||
|
|
||||||
|
setupDispatcher(rmContext, conf);
|
||||||
|
|
||||||
|
appMonitor.submitApplication(context);
|
||||||
|
RMApp app = rmContext.getRMApps().get(appID);
|
||||||
|
Assert.assertNotNull("app is null", app);
|
||||||
|
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
|
||||||
|
Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
|
||||||
|
Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
|
||||||
|
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
|
||||||
|
Assert.assertNotNull("app store is null", app.getApplicationStore());
|
||||||
|
|
||||||
|
// wait for event to be processed
|
||||||
|
int timeoutSecs = 0;
|
||||||
|
while ((getAppEventType() == RMAppEventType.KILL) &&
|
||||||
|
timeoutSecs++ < 20) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
|
||||||
|
setAppEventType(RMAppEventType.KILL);
|
||||||
|
((Service)rmContext.getDispatcher()).stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMAppSubmitError() throws Exception {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// specify 1 here and use same appId below so it gets duplicate entry
|
||||||
|
RMContext rmContext = mockRMContext(1, now - 10);
|
||||||
|
ResourceScheduler scheduler = new CapacityScheduler();
|
||||||
|
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||||
|
new ApplicationTokenSecretManager(), scheduler);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||||
|
new ClientToAMSecretManager(), scheduler, masterService, conf);
|
||||||
|
|
||||||
|
ApplicationId appID = MockApps.newAppID(0);
|
||||||
|
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||||
|
context.setApplicationId(appID);
|
||||||
|
context.setApplicationName("testApp1");
|
||||||
|
context.setQueue("testQueue");
|
||||||
|
|
||||||
|
setupDispatcher(rmContext, conf);
|
||||||
|
|
||||||
|
RMApp appOrig = rmContext.getRMApps().get(appID);
|
||||||
|
Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
|
||||||
|
|
||||||
|
// our testApp1 should be rejected and original app with same id should be left in place
|
||||||
|
appMonitor.submitApplication(context);
|
||||||
|
|
||||||
|
// make sure original app didn't get removed
|
||||||
|
RMApp app = rmContext.getRMApps().get(appID);
|
||||||
|
Assert.assertNotNull("app is null", app);
|
||||||
|
Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
|
||||||
|
Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName());
|
||||||
|
((Service)rmContext.getDispatcher()).stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.MockApps;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
|
||||||
|
public class MockRMApp implements RMApp {
|
||||||
|
static final int DT = 1000000; // ms
|
||||||
|
|
||||||
|
String user = MockApps.newUserName();
|
||||||
|
String name = MockApps.newAppName();
|
||||||
|
String queue = MockApps.newQueue();
|
||||||
|
long start = System.currentTimeMillis() - (int) (Math.random() * DT);
|
||||||
|
long finish = 0;
|
||||||
|
RMAppState state = RMAppState.NEW;
|
||||||
|
int failCount = 0;
|
||||||
|
ApplicationId id;
|
||||||
|
|
||||||
|
public MockRMApp(int newid, long time, RMAppState newState) {
|
||||||
|
finish = time;
|
||||||
|
id = MockApps.newAppID(newid);
|
||||||
|
state = newState;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockRMApp(int newid, long time, RMAppState newState, String userName) {
|
||||||
|
this(newid, time, newState);
|
||||||
|
user = userName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationId getApplicationId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMAppState getState() {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getUser() {
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() {
|
||||||
|
return (float) 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMAppAttempt getCurrentAppAttempt() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationReport createAndGetApplicationReport() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationStore getApplicationStore() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getFinishTime() {
|
||||||
|
return finish;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getStartTime() {
|
||||||
|
return start;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTrackingUrl() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StringBuilder getDiagnostics() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handle(RMAppEvent event) {
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
|
@ -262,6 +262,18 @@ public class TestRMAppTransitions {
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppNewReject() throws IOException {
|
||||||
|
LOG.info("--- START: testAppNewReject ---");
|
||||||
|
|
||||||
|
RMApp application = createNewTestApp();
|
||||||
|
// NEW => FAILED event RMAppEventType.APP_REJECTED
|
||||||
|
String rejectedText = "Test Application Rejected";
|
||||||
|
RMAppEvent event = new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
||||||
|
application.handle(event);
|
||||||
|
assertFailed(application, rejectedText);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppSubmittedRejected() throws IOException {
|
public void testAppSubmittedRejected() throws IOException {
|
||||||
LOG.info("--- START: testAppSubmittedRejected ---");
|
LOG.info("--- START: testAppSubmittedRejected ---");
|
||||||
|
|
Loading…
Reference in New Issue