From 726b48f51ab8f48e69a0b6bc2e8a73234aaea07a Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Wed, 5 Sep 2012 19:39:53 +0000 Subject: [PATCH] YARN-68. NodeManager will refuse to shutdown indefinitely due to container log aggregation (daryn via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1381317 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../logaggregation/AppLogAggregator.java | 3 -- .../logaggregation/AppLogAggregatorImpl.java | 22 +++------- .../logaggregation/LogAggregationService.java | 44 ++++++++++++++----- .../TestLogAggregationService.java | 18 ++++++-- 5 files changed, 56 insertions(+), 34 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ca268f0cd49..5d593be9807 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -111,3 +111,6 @@ Release 0.23.3 - Unreleased thus causes all containers to be rejected. (vinodkv) YARN-66. aggregated logs permissions not set properly (tgraves via bobby) + + YARN-68. NodeManager will refuse to shutdown indefinitely due to container + log aggregation (daryn via bobby) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 71d5d9b14a6..deb6c045c83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -26,7 +26,4 @@ public interface AppLogAggregator extends Runnable { boolean wasContainerSuccessful); void finishLogAggregation(); - - void join(); - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index d2264643eb5..5f3c4dfa84a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -137,6 +137,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator { try { doAppLogAggregation(); } finally { + if (!this.appAggregationFinished.get()) { + LOG.warn("Aggregation did not complete for application " + appId); + } this.appAggregationFinished.set(true); } } @@ -155,6 +158,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { } } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); + this.appFinishing.set(true); } } @@ -197,6 +201,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + this.appAggregationFinished.set(true); } private Path getRemoteNodeTmpLogFileForApp() { @@ -250,21 +255,4 @@ public class AppLogAggregatorImpl implements AppLogAggregator { LOG.info("Application just finished : " + this.applicationId); this.appFinishing.set(true); } - - @Override - public void join() { - // Aggregation service is finishing - this.finishLogAggregation(); - - while (!this.appAggregationFinished.get()) { - LOG.info("Waiting for aggregation to complete for " - + this.applicationId); - try { - Thread.sleep(THREAD_SLEEP_TIME); - } catch (InterruptedException e) { - LOG.warn("Join interrupted. Some logs may not have been aggregated!!"); - break; - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index f2a9b5f49ea..69c981e198e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,8 +36,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -137,11 +136,33 @@ public class LogAggregationService extends AbstractService implements @Override public synchronized void stop() { LOG.info(this.getName() + " waiting for pending aggregation during exit"); - for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) { - appLogAggregator.join(); - } + stopAggregators(); super.stop(); } + + private void stopAggregators() { + threadPool.shutdown(); + // politely ask to finish + for (AppLogAggregator aggregator : appLogAggregators.values()) { + aggregator.finishLogAggregation(); + } + while (!threadPool.isTerminated()) { // wait for all threads to finish + for (ApplicationId appId : appLogAggregators.keySet()) { + LOG.info("Waiting for aggregation to complete for " + appId); + } + try { + if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); // send interrupt to hurry them along + } + } catch (InterruptedException e) { + LOG.warn("Aggregation stop interrupted!"); + break; + } + } + for (ApplicationId appId : appLogAggregators.keySet()) { + LOG.warn("Some logs may not have been aggregated for " + appId); + } + } private void verifyAndCreateRemoteLogDir(Configuration conf) { // Checking the existance of the TLD @@ -293,10 +314,7 @@ public class LogAggregationService extends AbstractService implements final UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); if (credentials != null) { - for (Token token : credentials - .getAllTokens()) { - userUgi.addToken(token); - } + userUgi.addCredentials(credentials); } // New application @@ -312,9 +330,13 @@ public class LogAggregationService extends AbstractService implements try { // Create the app dir createAppDir(user, appId, userUgi); - } catch (YarnException e) { + } catch (Exception e) { + appLogAggregators.remove(appId); closeFileSystems(userUgi); - throw e; + if (!(e instanceof YarnException)) { + e = new YarnException(e); + } + throw (YarnException)e; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 885855cbc2f..74a3858a5d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -157,14 +157,18 @@ public class TestLogAggregationService extends BaseContainerManagerTest { application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); // ensure filesystems were closed verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); + delSrvc.stop(); + String containerIdStr = ConverterUtils.toString(container11); File containerLogDir = new File(app1LogDir, containerIdStr); for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { - Assert.assertFalse(new File(containerLogDir, fileType).exists()); + File f = new File(containerLogDir, fileType); + Assert.assertFalse("check "+f, f.exists()); } Assert.assertFalse(app1LogDir.exists()); @@ -222,6 +226,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); Assert.assertFalse(new File(logAggregationService .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) @@ -356,6 +361,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); verifyContainerLogs(logAggregationService, application1, new ContainerId[] { container11, container12 }); @@ -454,7 +460,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ApplicationId appId = BuilderUtils.newApplicationId( System.currentTimeMillis(), (int)Math.random()); - doThrow(new YarnException("KABOOM!")) + Exception e = new RuntimeException("KABOOM!"); + doThrow(e) .when(logAggregationService).createAppDir(any(String.class), any(ApplicationId.class), any(UserGroupInformation.class)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, @@ -463,7 +470,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ - new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!") + new ApplicationFinishEvent(appId, + "Application failed to init aggregation: "+e) }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); @@ -479,6 +487,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); dispatcher.await(); + + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); } private void writeContainerLogs(File appLogDir, ContainerId containerId) @@ -690,6 +701,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); } @Test