From 367dd52c165df4073996a708519a5fd03f38d122 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Fri, 24 Feb 2012 02:13:25 +0000 Subject: [PATCH] merge MAPREDUCE-3738 from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1293061 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../logaggregation/AppLogAggregatorImpl.java | 12 ++++++-- .../TestLogAggregationService.java | 29 +++++++++++++++++++ 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3c37e2189d0..dedb3093704 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -45,6 +45,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles via tgraves) + + MAPREDUCE-3738. MM can hang during shutdown if AppLogAggregatorImpl thread + dies unexpectedly (Jason Lowe via sseth) Release 0.23.1 - 2012-02-17 diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index fdd4ecb2f63..d2264643eb5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -133,8 +133,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } @Override + public void run() { + try { + doAppLogAggregation(); + } finally { + this.appAggregationFinished.set(true); + } + } + @SuppressWarnings("unchecked") - public void run() { + private void doAppLogAggregation() { ContainerId containerId; while (!this.appFinishing.get()) { @@ -189,8 +197,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); - - this.appAggregationFinished.set(true); } private Path getRemoteNodeTmpLogFileForApp() { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index a1853b307b0..18b8d9b913e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; @@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; @@ -536,4 +538,31 @@ public class TestLogAggregationService extends BaseContainerManagerTest { appAcls.put(ApplicationAccessType.VIEW_APP, "*"); return appAcls; } + + @Test(timeout=20000) + @SuppressWarnings("unchecked") + public void testStopAfterError() throws Exception { + DeletionService delSrvc = mock(DeletionService.class); + + // get the AppLogAggregationImpl thread to crash + LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); + when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException()); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, delSrvc, + mockedDirSvc); + logAggregationService.init(this.conf); + logAggregationService.start(); + + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + logAggregationService.handle(new LogHandlerAppStartedEvent( + application1, this.user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); + + logAggregationService.stop(); + } }