Merge 1529043 from trunk to branch-2 for YARN-1149. NM throws InvalidStateTransitonException: Invalid event: APPLICATION_LOG_HANDLING_FINISHED at RUNNING. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1529044 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hitesh Shah 2013-10-04 00:46:18 +00:00
parent 33b56c4866
commit 75aeb6070a
16 changed files with 230 additions and 145 deletions

View File

@ -129,6 +129,9 @@ Release 2.1.2 - UNRELEASED
YARN-1236. FairScheduler setting queue name in RMApp is not working.
(Sandy Ryza)
YARN-1149. NM throws InvalidStateTransitonException: Invalid event:
APPLICATION_LOG_HANDLING_FINISHED at RUNNING (Xuan Gong via hitesh)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES

View File

@ -107,6 +107,7 @@ public class TestAMRMClient {
// start minicluster
conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();

View File

@ -27,13 +27,31 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEventType;
public class CMgrCompletedAppsEvent extends ContainerManagerEvent {
private final List<ApplicationId> appsToCleanup;
private final Reason reason;
public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup) {
public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup, Reason reason) {
super(ContainerManagerEventType.FINISH_APPS);
this.appsToCleanup = appsToCleanup;
this.reason = reason;
}
public List<ApplicationId> getAppsToCleanup() {
return this.appsToCleanup;
}
public Reason getReason() {
return reason;
}
public static enum Reason {
/**
* Application is killed as NodeManager is shut down
*/
ON_SHUTDOWN,
/**
* Application is killed by ResourceManager
*/
BY_RESOURCEMANAGER
}
}

View File

@ -24,24 +24,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
private List<ContainerId> containerToCleanup;
private Reason reason;
private final List<ContainerId> containerToCleanup;
public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup, Reason reason) {
public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup) {
super(ContainerManagerEventType.FINISH_CONTAINERS);
this.containerToCleanup = containersToCleanup;
this.reason = reason;
}
public List<ContainerId> getContainersToCleanup() {
return this.containerToCleanup;
}
public Reason getReason() {
return reason;
}
public static enum Reason {
ON_SHUTDOWN, BY_RESOURCEMANAGER
}
}

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@ -67,11 +64,6 @@ public class NodeManager extends CompositeService
* Priority of the NodeManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
/**
* Extra duration to wait for containers to be killed on shutdown.
*/
private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
@ -84,8 +76,6 @@ public class NodeManager extends CompositeService
private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private long waitForContainersOnShutdownMillis;
private AtomicBoolean isStopping = new AtomicBoolean(false);
public NodeManager() {
@ -193,13 +183,6 @@ public class NodeManager extends CompositeService
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
super.serviceInit(conf);
// TODO add local dirs to del
}
@ -219,9 +202,6 @@ public class NodeManager extends CompositeService
if (isStopping.getAndSet(true)) {
return;
}
if (context != null) {
cleanupContainers(NodeManagerEventType.SHUTDOWN);
}
super.serviceStop();
DefaultMetricsSystem.shutdown();
}
@ -246,68 +226,12 @@ public class NodeManager extends CompositeService
public void run() {
LOG.info("Notifying ContainerManager to block new container-requests");
containerManager.setBlockNewContainerRequests(true);
cleanupContainers(NodeManagerEventType.RESYNC);
containerManager.cleanUpApplications(NodeManagerEventType.RESYNC);
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
}
}.start();
}
@SuppressWarnings("unchecked")
protected void cleanupContainers(NodeManagerEventType eventType) {
Map<ContainerId, Container> containers = context.getContainers();
if (containers.isEmpty()) {
return;
}
LOG.info("Containers still running on " + eventType + " : "
+ containers.keySet());
List<ContainerId> containerIds =
new ArrayList<ContainerId>(containers.keySet());
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for containers to be killed");
switch (eventType) {
case SHUTDOWN:
long waitStartTime = System.currentTimeMillis();
while (!containers.isEmpty()
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
try {
//To remove done containers in NM context
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on shutdown",
ex);
}
}
break;
case RESYNC:
while (!containers.isEmpty()) {
try {
Thread.sleep(1000);
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on resync",
ex);
}
}
break;
default:
LOG.warn("Invalid eventType: " + eventType);
}
// All containers killed
if (containers.isEmpty()) {
LOG.info("All containers in DONE state");
} else {
LOG.info("Done waiting for containers to be killed. Still alive: " +
containers.keySet());
}
}
public static class NMContext implements Context {
private NodeId nodeId = null;

View File

@ -499,18 +499,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (containersToCleanup.size() != 0) {
if (!containersToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
new CMgrCompletedContainersEvent(containersToCleanup));
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) {
if (!appsToCleanup.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
new CMgrCompletedAppsEvent(appsToCleanup,
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM

View File

@ -30,6 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -70,6 +73,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -82,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerEventType;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
@ -119,6 +124,11 @@ public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
EventHandler<ContainerManagerEvent> {
/**
* Extra duration to wait for applications to be killed on shutdown.
*/
private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
final Context context;
@ -137,6 +147,11 @@ public class ContainerManagerImpl extends CompositeService implements
private final DeletionService deletionService;
private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
private long waitForContainersOnShutdownMillis;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@ -180,6 +195,10 @@ public class ContainerManagerImpl extends CompositeService implements
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
addService(dispatcher);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
}
@Override
@ -189,6 +208,13 @@ public class ContainerManagerImpl extends CompositeService implements
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
waitForContainersOnShutdownMillis =
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
super.serviceInit(conf);
}
@ -274,6 +300,16 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public void serviceStop() throws Exception {
setBlockNewContainerRequests(true);
this.writeLock.lock();
try {
serviceStopped = true;
if (context != null) {
cleanUpApplications(NodeManagerEventType.SHUTDOWN);
}
} finally {
this.writeLock.unlock();
}
if (auxiliaryServices.getServiceState() == STARTED) {
auxiliaryServices.unregisterServiceListener(this);
}
@ -283,6 +319,60 @@ public class ContainerManagerImpl extends CompositeService implements
super.serviceStop();
}
public void cleanUpApplications(NodeManagerEventType eventType) {
Map<ApplicationId, Application> applications =
this.context.getApplications();
if (applications.isEmpty()) {
return;
}
LOG.info("Applications still running : " + applications.keySet());
List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet());
this.handle(
new CMgrCompletedAppsEvent(appIds,
CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for Applications to be Finished");
switch (eventType) {
case SHUTDOWN:
long waitStartTime = System.currentTimeMillis();
while (!applications.isEmpty()
&& System.currentTimeMillis() - waitStartTime
< waitForContainersOnShutdownMillis) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on applications finish on shutdown",
ex);
}
}
break;
case RESYNC:
while (!applications.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on applications finish on resync",
ex);
}
}
break;
default:
throw new YarnRuntimeException("Get an unknown NodeManagerEventType: "
+ eventType);
}
// All applications Finished
if (applications.isEmpty()) {
LOG.info("All applications in FINISHED state");
} else {
LOG.info("Done waiting for Applications to be Finished. Still alive: " +
applications.keySet());
}
}
// Get the remoteUGI corresponding to the api call.
protected UserGroupInformation getRemoteUgi()
throws YarnException {
@ -466,29 +556,40 @@ public class ContainerManagerImpl extends CompositeService implements
+ " already is running on this node!!");
}
// Create the application
Application application =
new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
this.readLock.lock();
try {
if (!serviceStopped) {
// Create the application
Application application =
new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, container.getLaunchContext()
.getApplicationACLs()));
dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, container.getLaunchContext()
.getApplicationACLs()));
}
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
this.context.getContainerTokenSecretManager().startContainerSuccessful(
containerTokenIdentifier);
NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
"ContainerManageImpl", applicationID, containerId);
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers.
metrics.launchedContainer();
metrics.allocateContainer(containerTokenIdentifier.getResource());
} else {
throw new YarnException(
"Container start failed as the NodeManager is " +
"in the process of shutting down");
}
} finally {
this.readLock.unlock();
}
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
this.context.getContainerTokenSecretManager().startContainerSuccessful(
containerTokenIdentifier);
NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
"ContainerManageImpl", applicationID, containerId);
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
// launch. A finished Application will not launch containers.
metrics.launchedContainer();
metrics.allocateContainer(containerTokenIdentifier.getResource());
}
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
@ -713,9 +814,15 @@ public class ContainerManagerImpl extends CompositeService implements
CMgrCompletedAppsEvent appsFinishedEvent =
(CMgrCompletedAppsEvent) event;
for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
String diagnostic = "";
if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
diagnostic = "Application killed on shutdown";
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
"Application Killed by ResourceManager"));
diagnostic));
}
break;
case FINISH_CONTAINERS:
@ -723,20 +830,14 @@ public class ContainerManagerImpl extends CompositeService implements
(CMgrCompletedContainersEvent) event;
for (ContainerId container : containersFinishedEvent
.getContainersToCleanup()) {
String diagnostic = "";
if (containersFinishedEvent.getReason() ==
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN) {
diagnostic = "Container Killed on Shutdown";
} else if (containersFinishedEvent.getReason() ==
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Container Killed by ResourceManager";
}
this.dispatcher.getEventHandler().handle(
new ContainerKillEvent(container, diagnostic));
this.dispatcher.getEventHandler().handle(
new ContainerKillEvent(container,
"Container Killed by ResourceManager"));
}
break;
default:
LOG.warn("Invalid event " + event.getType() + ". Ignoring.");
throw new YarnRuntimeException(
"Get an unknown ContainerManagerEvent type: " + event.getType());
}
}

View File

@ -177,6 +177,13 @@ public class ApplicationImpl implements Application {
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
new AppFinishTransition())
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.APPLICATION_INITED,
ApplicationEventType.FINISH_APPLICATION))
// Transitions from APPLICATION_RESOURCES_CLEANINGUP state
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
@ -186,12 +193,25 @@ public class ApplicationImpl implements Application {
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_INITED,
ApplicationEventType.FINISH_APPLICATION))
// Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
new AppLogsAggregatedTransition())
.addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.FINISH_APPLICATION))
// create the topology tables
.installTopology();
@ -343,7 +363,7 @@ public class ApplicationImpl implements Application {
@Override
public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) {
ApplicationFinishEvent appEvent = (ApplicationFinishEvent)event;
if (app.containers.isEmpty()) {
// No container to cleanup. Cleanup app level resources.
app.handleAppFinishWithContainersCleanedup();
@ -355,7 +375,7 @@ public class ApplicationImpl implements Application {
for (ContainerId containerID : app.containers.keySet()) {
app.dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
"Container killed on application-finish event from RM."));
"Container killed on application-finish event: " + appEvent.getDiagnostic()));
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
}

View File

@ -288,6 +288,7 @@ public class TestNodeManagerReboot {
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf;
}
}

View File

@ -143,6 +143,7 @@ public class TestNodeManagerResync {
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf;
}

View File

@ -242,6 +242,7 @@ public class TestNodeManagerShutdown {
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf;
}

View File

@ -454,13 +454,13 @@ public class TestNodeStatusUpdater {
@Override
protected void serviceStop() throws Exception {
System.out.println("Called stooppppp");
super.serviceStop();
isStopped = true;
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers =
getNMContext().getContainers();
// ensure that containers are empty
if(!containers.isEmpty()) {
ConcurrentMap<ApplicationId, Application> applications =
getNMContext().getApplications();
// ensure that applications are empty
if(!applications.isEmpty()) {
assertionFailedInThread.set(true);
}
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
@ -859,9 +859,20 @@ public class TestNodeStatusUpdater {
}
@Override
protected void cleanupContainers(NodeManagerEventType eventType) {
super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, dirsHandler) {
@Override
public void cleanUpApplications(NodeManagerEventType eventType) {
super.cleanUpApplications(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
}
};
}
};
@ -1161,6 +1172,7 @@ public class TestNodeStatusUpdater {
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
connectionRetryIntervalMs);
conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
CyclicBarrier syncBarrier = new CyclicBarrier(2);
nm = new MyNodeManager2(syncBarrier, conf);
nm.init(conf);
@ -1201,9 +1213,20 @@ public class TestNodeStatusUpdater {
}
@Override
protected void cleanupContainers(NodeManagerEventType eventType) {
super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, dirsHandler) {
@Override
public void cleanUpApplications(NodeManagerEventType eventType) {
super.cleanUpApplications(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
}
};
}
};
@ -1345,6 +1368,7 @@ public class TestNodeStatusUpdater {
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf;
}

View File

@ -166,6 +166,7 @@ public abstract class BaseContainerManagerTest {
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
// Default delSrvc
delSrvc = createDeletionService();
delSrvc.init(conf);

View File

@ -538,7 +538,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
// Simulate RM sending an AppFinish event.
containerManager.handle(new CMgrCompletedAppsEvent(Arrays
.asList(new ApplicationId[] { appId })));
.asList(new ApplicationId[] { appId }), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
BaseContainerManagerTest.waitForApplicationState(containerManager,
cId.getApplicationAttemptId().getApplicationId(),

View File

@ -586,8 +586,8 @@ public class TestApplication {
}
public void appFinished() {
app.handle(new ApplicationEvent(appId,
ApplicationEventType.FINISH_APPLICATION));
app.handle(new ApplicationFinishEvent(appId,
"Finish Application"));
drainDispatcherEvents();
}

View File

@ -826,7 +826,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
cId, ContainerState.COMPLETE);
this.containerManager.handle(new CMgrCompletedAppsEvent(Arrays
.asList(appId)));
.asList(appId), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
this.containerManager.stop();
}