diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 77ae483207d..56528af93a3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -152,6 +152,9 @@ Release 2.0.5-beta - UNRELEASED YARN-209. Fix CapacityScheduler to trigger application-activation when the cluster capacity changes. (Zhijie Shen via vinodkv) + YARN-24. Nodemanager fails to start if log aggregation enabled and + namenode unavailable. (sandyr via tucu) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 88a01eb3854..f183d9e8aba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -97,7 +96,7 @@ public class LogAggregationService extends AbstractService implements private final ConcurrentMap appLogAggregators; private final ExecutorService threadPool; - + public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) { super(LogAggregationService.class.getName()); @@ -129,7 +128,6 @@ public class LogAggregationService extends AbstractService implements // NodeId is only available during start, the following cannot be moved // anywhere else. this.nodeId = this.context.getNodeId(); - verifyAndCreateRemoteLogDir(getConfig()); super.start(); } @@ -164,7 +162,7 @@ public class LogAggregationService extends AbstractService implements } } - private void verifyAndCreateRemoteLogDir(Configuration conf) { + void verifyAndCreateRemoteLogDir(Configuration conf) { // Checking the existance of the TLD FileSystem remoteFS = null; try { @@ -177,7 +175,7 @@ public class LogAggregationService extends AbstractService implements remoteExists = remoteFS.exists(this.remoteRootLogDir); } catch (IOException e) { throw new YarnException("Failed to check for existence of remoteLogDir [" - + this.remoteRootLogDir + "]"); + + this.remoteRootLogDir + "]", e); } if (remoteExists) { try { @@ -191,8 +189,8 @@ public class LogAggregationService extends AbstractService implements } } catch (IOException e) { throw new YarnException( - "Failed while attempting to check permissions for dir [" - + this.remoteRootLogDir + "]"); + "Failed to check permissions for dir [" + + this.remoteRootLogDir + "]", e); } } else { LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir @@ -208,7 +206,6 @@ public class LogAggregationService extends AbstractService implements + this.remoteRootLogDir + "]", e); } } - } Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { @@ -296,6 +293,7 @@ public class LogAggregationService extends AbstractService implements Map appAcls) { ApplicationEvent eventResponse; try { + verifyAndCreateRemoteLogDir(getConfig()); initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 2408681ddbd..6a9a6767567 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -44,6 +44,7 @@ import junit.framework.Assert; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataInputBuffer; @@ -79,7 +80,6 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mortbay.util.MultiException; @@ -393,7 +394,76 @@ public class TestLogAggregationService extends BaseContainerManagerTest { @Test @SuppressWarnings("unchecked") - public void testLogAggregationInitFailsWithoutKillingNM() throws Exception { + public void testVerifyAndCreateRemoteDirsFailure() + throws Exception { + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + + DrainDispatcher dispatcher = createDispatcher(); + EventHandler appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + + LogAggregationService logAggregationService = spy( + new LogAggregationService(dispatcher, this.context, this.delSrvc, + super.dirsHandler)); + logAggregationService.init(this.conf); + + YarnException e = new YarnException("KABOOM!"); + doThrow(e) + .when(logAggregationService).verifyAndCreateRemoteLogDir( + any(Configuration.class)); + + logAggregationService.start(); + + // Now try to start an application + ApplicationId appId = BuilderUtils.newApplicationId( + System.currentTimeMillis(), (int)Math.random()); + logAggregationService.handle(new LogHandlerAppStartedEvent(appId, + this.user, null, + ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, + this.acls)); + dispatcher.await(); + + // Verify that it failed + ApplicationEvent[] expectedEvents = new ApplicationEvent[] { + new ApplicationEvent(appId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED) + }; + checkEvents(appEventHandler, expectedEvents, false, + "getType", "getApplicationID", "getDiagnostic"); + + Mockito.reset(logAggregationService); + + // Now try to start another one + ApplicationId appId2 = BuilderUtils.newApplicationId( + System.currentTimeMillis(), (int)Math.random()); + File appLogDir = + new File(localLogDir, ConverterUtils.toString(appId2)); + appLogDir.mkdir(); + + logAggregationService.handle(new LogHandlerAppStartedEvent(appId2, + this.user, null, + ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, + this.acls)); + dispatcher.await(); + + // Verify that it worked + expectedEvents = new ApplicationEvent[] { + new ApplicationEvent(appId, // original failure + ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), + new ApplicationEvent(appId2, // success + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) + }; + checkEvents(appEventHandler, expectedEvents, false, + "getType", "getApplicationID", "getDiagnostic"); + + logAggregationService.stop(); + } + + @Test + @SuppressWarnings("unchecked") + public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());