diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 119acf585ff..58a12b90864 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -846,6 +846,9 @@ Release 2.7.2 - UNRELEASED YARN-4087. Followup fixes after YARN-2019 regarding RM behavior when state-store error occurs. (Jian He via xgong) + YARN-4096. App local logs are leaked if log aggregation fails to initialize + for the app. (Jason Lowe via zxu) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java index 83c5d5a6244..01786995736 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java @@ -27,4 +27,6 @@ public interface AppLogAggregator extends Runnable { void abortLogAggregation(); void finishLogAggregation(); + + void disableLogAggregation(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 742b8a98cd5..b2342c7c578 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -596,6 +596,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.notifyAll(); } + @Override + public void disableLogAggregation() { + this.logAggregationDisabled = true; + } + @Private @VisibleForTesting // This is only used for testing. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 259e9ae9601..6a6f101a881 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -363,19 +363,19 @@ public class LogAggregationService extends AbstractService implements throw new YarnRuntimeException("Duplicate initApp for " + appId); } // wait until check for existing aggregator to create dirs + YarnRuntimeException appDirException = null; try { // Create the app dir createAppDir(user, appId, userUgi); } catch (Exception e) { - appLogAggregators.remove(appId); - closeFileSystems(userUgi); + appLogAggregator.disableLogAggregation(); if (!(e instanceof YarnRuntimeException)) { - e = new YarnRuntimeException(e); + appDirException = new YarnRuntimeException(e); + } else { + appDirException = (YarnRuntimeException)e; } - throw (YarnRuntimeException)e; } - // TODO Get the user configuration for the list of containers that need log // aggregation. @@ -391,6 +391,10 @@ public class LogAggregationService extends AbstractService implements } }; this.threadPool.execute(aggregatorWrapper); + + if (appDirException != null) { + throw appDirException; + } } protected void closeFileSystems(final UserGroupInformation userUgi) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 77d75cae7b8..77c6e3c42aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -731,9 +731,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - + + DeletionService spyDelSrvc = spy(this.delSrvc); LogAggregationService logAggregationService = spy( - new LogAggregationService(dispatcher, this.context, this.delSrvc, + new LogAggregationService(dispatcher, this.context, spyDelSrvc, super.dirsHandler)); logAggregationService.init(this.conf); logAggregationService.start(); @@ -741,6 +742,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest { ApplicationId appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000)); + + File appLogDir = + new File(localLogDir, ConverterUtils.toString(appId)); + appLogDir.mkdir(); + Exception e = new RuntimeException("KABOOM!"); doThrow(e) .when(logAggregationService).createAppDir(any(String.class), @@ -759,9 +765,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); - // filesystems may have been instantiated - verify(logAggregationService).closeFileSystems( - any(UserGroupInformation.class)); // verify trying to collect logs for containers/apps we don't know about // doesn't blow up and tear down the NM @@ -774,6 +777,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); + verify(spyDelSrvc).delete(eq(user), any(Path.class), + Mockito.anyVararg()); + verify(logAggregationService).closeFileSystems( + any(UserGroupInformation.class)); } private void writeContainerLogs(File appLogDir, ContainerId containerId,