YARN-1149. NM throws InvalidStateTransitonException: Invalid event: APPLICATION_LOG_HANDLING_FINISHED at RUNNING. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hitesh Shah 2013-10-04 00:43:26 +00:00
parent 8ebf37f369
commit d3841bd499
16 changed files with 230 additions and 145 deletions

View File

@ -147,6 +147,9 @@ Release 2.1.2 - UNRELEASED
YARN-1256. NM silently ignores non-existent service in YARN-1256. NM silently ignores non-existent service in
StartContainerRequest (Xuan Gong via bikas) StartContainerRequest (Xuan Gong via bikas)
YARN-1149. NM throws InvalidStateTransitonException: Invalid event:
APPLICATION_LOG_HANDLING_FINISHED at RUNNING (Xuan Gong via hitesh)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -107,6 +107,7 @@ public class TestAMRMClient {
// start minicluster // start minicluster
conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();

View File

@ -27,13 +27,31 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEventType;
public class CMgrCompletedAppsEvent extends ContainerManagerEvent { public class CMgrCompletedAppsEvent extends ContainerManagerEvent {
private final List<ApplicationId> appsToCleanup; private final List<ApplicationId> appsToCleanup;
private final Reason reason;
public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup) { public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup, Reason reason) {
super(ContainerManagerEventType.FINISH_APPS); super(ContainerManagerEventType.FINISH_APPS);
this.appsToCleanup = appsToCleanup; this.appsToCleanup = appsToCleanup;
this.reason = reason;
} }
public List<ApplicationId> getAppsToCleanup() { public List<ApplicationId> getAppsToCleanup() {
return this.appsToCleanup; return this.appsToCleanup;
} }
public Reason getReason() {
return reason;
}
public static enum Reason {
/**
* Application is killed as NodeManager is shut down
*/
ON_SHUTDOWN,
/**
* Application is killed by ResourceManager
*/
BY_RESOURCEMANAGER
}
} }

View File

@ -24,24 +24,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
public class CMgrCompletedContainersEvent extends ContainerManagerEvent { public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
private List<ContainerId> containerToCleanup; private final List<ContainerId> containerToCleanup;
private Reason reason;
public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup, Reason reason) { public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) {
super(ContainerManagerEventType.FINISH_CONTAINERS); super(ContainerManagerEventType.FINISH_CONTAINERS);
this.containerToCleanup = containersToCleanup; this.containerToCleanup = containersToCleanup;
this.reason = reason;
} }
public List<ContainerId> getContainersToCleanup() { public List<ContainerId> getContainersToCleanup() {
return this.containerToCleanup; return this.containerToCleanup;
} }
public Reason getReason() {
return reason;
}
public static enum Reason {
ON_SHUTDOWN, BY_RESOURCEMANAGER
}
} }

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -67,11 +64,6 @@ public class NodeManager extends CompositeService
* Priority of the NodeManager shutdown hook. * Priority of the NodeManager shutdown hook.
*/ */
public static final int SHUTDOWN_HOOK_PRIORITY = 30; public static final int SHUTDOWN_HOOK_PRIORITY = 30;
/**
* Extra duration to wait for containers to be killed on shutdown.
*/
private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
private static final Log LOG = LogFactory.getLog(NodeManager.class); private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
@ -84,8 +76,6 @@ public class NodeManager extends CompositeService
private NodeStatusUpdater nodeStatusUpdater; private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook; private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private long waitForContainersOnShutdownMillis;
private AtomicBoolean isStopping = new AtomicBoolean(false); private AtomicBoolean isStopping = new AtomicBoolean(false);
public NodeManager() { public NodeManager() {
@ -193,13 +183,6 @@ public class NodeManager extends CompositeService
// so that we make sure everything is up before registering with RM. // so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater); addService(nodeStatusUpdater);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
super.serviceInit(conf); super.serviceInit(conf);
// TODO add local dirs to del // TODO add local dirs to del
} }
@ -219,9 +202,6 @@ public class NodeManager extends CompositeService
if (isStopping.getAndSet(true)) { if (isStopping.getAndSet(true)) {
return; return;
} }
if (context != null) {
cleanupContainers(NodeManagerEventType.SHUTDOWN);
}
super.serviceStop(); super.serviceStop();
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
} }
@ -246,68 +226,12 @@ public class NodeManager extends CompositeService
public void run() { public void run() {
LOG.info("Notifying ContainerManager to block new container-requests"); LOG.info("Notifying ContainerManager to block new container-requests");
containerManager.setBlockNewContainerRequests(true); containerManager.setBlockNewContainerRequests(true);
cleanupContainers(NodeManagerEventType.RESYNC); containerManager.cleanUpApplications(NodeManagerEventType.RESYNC);
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater(); ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
} }
}.start(); }.start();
} }
@SuppressWarnings("unchecked")
protected void cleanupContainers(NodeManagerEventType eventType) {
Map<ContainerId, Container> containers = context.getContainers();
if (containers.isEmpty()) {
return;
}
LOG.info("Containers still running on " + eventType + " : "
+ containers.keySet());
List<ContainerId> containerIds =
new ArrayList<ContainerId>(containers.keySet());
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for containers to be killed");
switch (eventType) {
case SHUTDOWN:
long waitStartTime = System.currentTimeMillis();
while (!containers.isEmpty()
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
try {
//To remove done containers in NM context
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on shutdown",
ex);
}
}
break;
case RESYNC:
while (!containers.isEmpty()) {
try {
Thread.sleep(1000);
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on resync",
ex);
}
}
break;
default:
LOG.warn("Invalid eventType: " + eventType);
}
// All containers killed
if (containers.isEmpty()) {
LOG.info("All containers in DONE state");
} else {
LOG.info("Done waiting for containers to be killed. Still alive: " +
containers.keySet());
}
}
public static class NMContext implements Context { public static class NMContext implements Context {
private NodeId nodeId = null; private NodeId nodeId = null;

View File

@ -499,18 +499,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
lastHeartBeatID = response.getResponseId(); lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response List<ContainerId> containersToCleanup = response
.getContainersToCleanup(); .getContainersToCleanup();
if (containersToCleanup.size() != 0) { if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup, new CMgrCompletedContainersEvent(containersToCleanup));
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
} }
List<ApplicationId> appsToCleanup = List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup(); response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP //Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup); trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) { if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup)); new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
} }
} catch (ConnectException e) { } catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM //catch and throw the exception if tried MAX wait time to connect RM

View File

@ -30,6 +30,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -71,6 +74,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException; import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -83,6 +87,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerEventType;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
@ -120,6 +125,11 @@ public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol, ServiceStateChangeListener, ContainerManagementProtocol,
EventHandler<ContainerManagerEvent> { EventHandler<ContainerManagerEvent> {
/**
* Extra duration to wait for applications to be killed on shutdown.
*/
private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class); private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
final Context context; final Context context;
@ -138,6 +148,11 @@ public class ContainerManagerImpl extends CompositeService implements
private final DeletionService deletionService; private final DeletionService deletionService;
private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false); private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
private long waitForContainersOnShutdownMillis;
public ContainerManagerImpl(Context context, ContainerExecutor exec, public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@ -181,6 +196,10 @@ public class ContainerManagerImpl extends CompositeService implements
dispatcher.register(ContainersLauncherEventType.class, containersLauncher); dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
addService(dispatcher); addService(dispatcher);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
} }
@Override @Override
@ -190,6 +209,13 @@ public class ContainerManagerImpl extends CompositeService implements
addIfService(logHandler); addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler); dispatcher.register(LogHandlerEventType.class, logHandler);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -275,6 +301,16 @@ public class ContainerManagerImpl extends CompositeService implements
@Override @Override
public void serviceStop() throws Exception { public void serviceStop() throws Exception {
setBlockNewContainerRequests(true);
this.writeLock.lock();
try {
serviceStopped = true;
if (context != null) {
cleanUpApplications(NodeManagerEventType.SHUTDOWN);
}
} finally {
this.writeLock.unlock();
}
if (auxiliaryServices.getServiceState() == STARTED) { if (auxiliaryServices.getServiceState() == STARTED) {
auxiliaryServices.unregisterServiceListener(this); auxiliaryServices.unregisterServiceListener(this);
} }
@ -284,6 +320,60 @@ public class ContainerManagerImpl extends CompositeService implements
super.serviceStop(); super.serviceStop();
} }
public void cleanUpApplications(NodeManagerEventType eventType) {
Map<ApplicationId, Application> applications =
this.context.getApplications();
if (applications.isEmpty()) {
return;
}
LOG.info("Applications still running : " + applications.keySet());
List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet());
this.handle(
new CMgrCompletedAppsEvent(appIds,
CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for Applications to be Finished");
switch (eventType) {
case SHUTDOWN:
long waitStartTime = System.currentTimeMillis();
while (!applications.isEmpty()
&& System.currentTimeMillis() - waitStartTime
< waitForContainersOnShutdownMillis) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on applications finish on shutdown",
ex);
}
}
break;
case RESYNC:
while (!applications.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on applications finish on resync",
ex);
}
}
break;
default:
throw new YarnRuntimeException("Get an unknown NodeManagerEventType: "
+ eventType);
}
// All applications Finished
if (applications.isEmpty()) {
LOG.info("All applications in FINISHED state");
} else {
LOG.info("Done waiting for Applications to be Finished. Still alive: " +
applications.keySet());
}
}
// Get the remoteUGI corresponding to the api call. // Get the remoteUGI corresponding to the api call.
protected UserGroupInformation getRemoteUgi() protected UserGroupInformation getRemoteUgi()
throws YarnException { throws YarnException {
@ -479,29 +569,40 @@ public class ContainerManagerImpl extends CompositeService implements
+ " already is running on this node!!"); + " already is running on this node!!");
} }
// Create the application this.readLock.lock();
Application application = try {
new ApplicationImpl(dispatcher, user, applicationID, credentials, context); if (!serviceStopped) {
if (null == context.getApplications().putIfAbsent(applicationID, // Create the application
application)) { Application application =
LOG.info("Creating a new application reference for app " + applicationID); new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, container.getLaunchContext() new ApplicationInitEvent(applicationID, container.getLaunchContext()
.getApplicationACLs())); .getApplicationACLs()));
}
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
this.context.getContainerTokenSecretManager().startContainerSuccessful(
containerTokenIdentifier);
NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
"ContainerManageImpl", applicationID, containerId);
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers.
metrics.launchedContainer();
metrics.allocateContainer(containerTokenIdentifier.getResource());
} else {
throw new YarnException(
"Container start failed as the NodeManager is " +
"in the process of shutting down");
}
} finally {
this.readLock.unlock();
} }
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
this.context.getContainerTokenSecretManager().startContainerSuccessful(
containerTokenIdentifier);
NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
"ContainerManageImpl", applicationID, containerId);
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers.
metrics.launchedContainer();
metrics.allocateContainer(containerTokenIdentifier.getResource());
} }
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier( protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
@ -726,9 +827,15 @@ public class ContainerManagerImpl extends CompositeService implements
CMgrCompletedAppsEvent appsFinishedEvent = CMgrCompletedAppsEvent appsFinishedEvent =
(CMgrCompletedAppsEvent) event; (CMgrCompletedAppsEvent) event;
for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) { for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
String diagnostic = "";
if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
diagnostic = "Application killed on shutdown";
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID, new ApplicationFinishEvent(appID,
"Application Killed by ResourceManager")); diagnostic));
} }
break; break;
case FINISH_CONTAINERS: case FINISH_CONTAINERS:
@ -736,20 +843,14 @@ public class ContainerManagerImpl extends CompositeService implements
(CMgrCompletedContainersEvent) event; (CMgrCompletedContainersEvent) event;
for (ContainerId container : containersFinishedEvent for (ContainerId container : containersFinishedEvent
.getContainersToCleanup()) { .getContainersToCleanup()) {
String diagnostic = ""; this.dispatcher.getEventHandler().handle(
if (containersFinishedEvent.getReason() == new ContainerKillEvent(container,
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN) { "Container Killed by ResourceManager"));
diagnostic = "Container Killed on Shutdown";
} else if (containersFinishedEvent.getReason() ==
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Container Killed by ResourceManager";
}
this.dispatcher.getEventHandler().handle(
new ContainerKillEvent(container, diagnostic));
} }
break; break;
default: default:
LOG.warn("Invalid event " + event.getType() + ". Ignoring."); throw new YarnRuntimeException(
"Get an unknown ContainerManagerEvent type: " + event.getType());
} }
} }

View File

@ -177,6 +177,13 @@ public class ApplicationImpl implements Application {
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP), ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED, ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
new AppFinishTransition()) new AppFinishTransition())
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.APPLICATION_INITED,
ApplicationEventType.FINISH_APPLICATION))
// Transitions from APPLICATION_RESOURCES_CLEANINGUP state // Transitions from APPLICATION_RESOURCES_CLEANINGUP state
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP, .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
@ -186,12 +193,25 @@ public class ApplicationImpl implements Application {
ApplicationState.FINISHED, ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP, ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition()) new AppCompletelyDoneTransition())
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_INITED,
ApplicationEventType.FINISH_APPLICATION))
// Transitions from FINISHED state // Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED, .addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED, ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
new AppLogsAggregatedTransition()) new AppLogsAggregatedTransition())
.addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.FINISH_APPLICATION))
// create the topology tables // create the topology tables
.installTopology(); .installTopology();
@ -343,7 +363,7 @@ public class ApplicationImpl implements Application {
@Override @Override
public ApplicationState transition(ApplicationImpl app, public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) { ApplicationEvent event) {
ApplicationFinishEvent appEvent = (ApplicationFinishEvent)event;
if (app.containers.isEmpty()) { if (app.containers.isEmpty()) {
// No container to cleanup. Cleanup app level resources. // No container to cleanup. Cleanup app level resources.
app.handleAppFinishWithContainersCleanedup(); app.handleAppFinishWithContainersCleanedup();
@ -355,7 +375,7 @@ public class ApplicationImpl implements Application {
for (ContainerId containerID : app.containers.keySet()) { for (ContainerId containerID : app.containers.keySet()) {
app.dispatcher.getEventHandler().handle( app.dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID, new ContainerKillEvent(containerID,
"Container killed on application-finish event from RM.")); "Container killed on application-finish event: " + appEvent.getDiagnostic()));
} }
return ApplicationState.FINISHING_CONTAINERS_WAIT; return ApplicationState.FINISHING_CONTAINERS_WAIT;
} }

View File

@ -288,6 +288,7 @@ public class TestNodeManagerReboot {
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf; return conf;
} }
} }

View File

@ -143,6 +143,7 @@ public class TestNodeManagerResync {
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath()); remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf; return conf;
} }

View File

@ -242,6 +242,7 @@ public class TestNodeManagerShutdown {
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf; return conf;
} }

View File

@ -454,13 +454,13 @@ public class TestNodeStatusUpdater {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
System.out.println("Called stooppppp");
super.serviceStop(); super.serviceStop();
isStopped = true; isStopped = true;
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager ConcurrentMap<ApplicationId, Application> applications =
.containermanager.container.Container> containers = getNMContext().getApplications();
getNMContext().getContainers(); // ensure that applications are empty
// ensure that containers are empty if(!applications.isEmpty()) {
if(!containers.isEmpty()) {
assertionFailedInThread.set(true); assertionFailedInThread.set(true);
} }
syncBarrier.await(10000, TimeUnit.MILLISECONDS); syncBarrier.await(10000, TimeUnit.MILLISECONDS);
@ -859,9 +859,20 @@ public class TestNodeStatusUpdater {
} }
@Override @Override
protected void cleanupContainers(NodeManagerEventType eventType) { protected ContainerManagerImpl createContainerManager(Context context,
super.cleanupContainers(NodeManagerEventType.SHUTDOWN); ContainerExecutor exec, DeletionService del,
numCleanups.incrementAndGet(); NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, dirsHandler) {
@Override
public void cleanUpApplications(NodeManagerEventType eventType) {
super.cleanUpApplications(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
}
};
} }
}; };
@ -1161,6 +1172,7 @@ public class TestNodeStatusUpdater {
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
connectionRetryIntervalMs); connectionRetryIntervalMs);
conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
CyclicBarrier syncBarrier = new CyclicBarrier(2); CyclicBarrier syncBarrier = new CyclicBarrier(2);
nm = new MyNodeManager2(syncBarrier, conf); nm = new MyNodeManager2(syncBarrier, conf);
nm.init(conf); nm.init(conf);
@ -1201,9 +1213,20 @@ public class TestNodeStatusUpdater {
} }
@Override @Override
protected void cleanupContainers(NodeManagerEventType eventType) { protected ContainerManagerImpl createContainerManager(Context context,
super.cleanupContainers(NodeManagerEventType.SHUTDOWN); ContainerExecutor exec, DeletionService del,
numCleanups.incrementAndGet(); NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, dirsHandler) {
@Override
public void cleanUpApplications(NodeManagerEventType eventType) {
super.cleanUpApplications(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
}
};
} }
}; };
@ -1345,6 +1368,7 @@ public class TestNodeStatusUpdater {
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath()); remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf; return conf;
} }

View File

@ -166,6 +166,7 @@ public abstract class BaseContainerManagerTest {
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
// Default delSrvc // Default delSrvc
delSrvc = createDeletionService(); delSrvc = createDeletionService();
delSrvc.init(conf); delSrvc.init(conf);

View File

@ -542,7 +542,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
// Simulate RM sending an AppFinish event. // Simulate RM sending an AppFinish event.
containerManager.handle(new CMgrCompletedAppsEvent(Arrays containerManager.handle(new CMgrCompletedAppsEvent(Arrays
.asList(new ApplicationId[] { appId }))); .asList(new ApplicationId[] { appId }), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
BaseContainerManagerTest.waitForApplicationState(containerManager, BaseContainerManagerTest.waitForApplicationState(containerManager,
cId.getApplicationAttemptId().getApplicationId(), cId.getApplicationAttemptId().getApplicationId(),

View File

@ -586,8 +586,8 @@ public class TestApplication {
} }
public void appFinished() { public void appFinished() {
app.handle(new ApplicationEvent(appId, app.handle(new ApplicationFinishEvent(appId,
ApplicationEventType.FINISH_APPLICATION)); "Finish Application"));
drainDispatcherEvents(); drainDispatcherEvents();
} }

View File

@ -826,7 +826,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
cId, ContainerState.COMPLETE); cId, ContainerState.COMPLETE);
this.containerManager.handle(new CMgrCompletedAppsEvent(Arrays this.containerManager.handle(new CMgrCompletedAppsEvent(Arrays
.asList(appId))); .asList(appId), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
this.containerManager.stop(); this.containerManager.stop();
} }