MAPREDUCE 3738. NM can hang during shutdown if AppLogAggregatorImpl thread dies unexpectedly. (Contributed by Jason Lowe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1293060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2012-02-24 02:12:06 +00:00
parent 81f5c5933d
commit 6112940841
3 changed files with 41 additions and 3 deletions

View File

@ -140,6 +140,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles
via tgraves) via tgraves)
MAPREDUCE-3738. MM can hang during shutdown if AppLogAggregatorImpl thread
dies unexpectedly (Jason Lowe via sseth)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -133,8 +133,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
} }
@Override @Override
@SuppressWarnings("unchecked")
public void run() { public void run() {
try {
doAppLogAggregation();
} finally {
this.appAggregationFinished.set(true);
}
}
@SuppressWarnings("unchecked")
private void doAppLogAggregation() {
ContainerId containerId; ContainerId containerId;
while (!this.appFinishing.get()) { while (!this.appFinishing.get()) {
@ -189,8 +197,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.dispatcher.getEventHandler().handle( this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId, new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
this.appAggregationFinished.set(true);
} }
private Path getRemoteNodeTmpLogFileForApp() { private Path getRemoteNodeTmpLogFileForApp() {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.assertTrue;
@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@ -536,4 +538,31 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
appAcls.put(ApplicationAccessType.VIEW_APP, "*"); appAcls.put(ApplicationAccessType.VIEW_APP, "*");
return appAcls; return appAcls;
} }
@Test(timeout=20000)
@SuppressWarnings("unchecked")
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
// get the AppLogAggregationImpl thread to crash
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
logAggregationService.handle(new LogHandlerAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
logAggregationService.stop();
}
} }