YARN-24. Nodemanager fails to start if log aggregation enabled and namenode unavailable. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461892 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-03-27 23:15:54 +00:00
parent c427540ced
commit d80e21f7bc
3 changed files with 81 additions and 10 deletions

View File

@ -97,6 +97,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-209. Fix CapacityScheduler to trigger application-activation when YARN-209. Fix CapacityScheduler to trigger application-activation when
the cluster capacity changes. (Zhijie Shen via vinodkv) the cluster capacity changes. (Zhijie Shen via vinodkv)
YARN-24. Nodemanager fails to start if log aggregation enabled and
namenode unavailable. (sandyr via tucu)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@ -129,7 +128,6 @@ public class LogAggregationService extends AbstractService implements
// NodeId is only available during start, the following cannot be moved // NodeId is only available during start, the following cannot be moved
// anywhere else. // anywhere else.
this.nodeId = this.context.getNodeId(); this.nodeId = this.context.getNodeId();
verifyAndCreateRemoteLogDir(getConfig());
super.start(); super.start();
} }
@ -164,7 +162,7 @@ public class LogAggregationService extends AbstractService implements
} }
} }
private void verifyAndCreateRemoteLogDir(Configuration conf) { void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existance of the TLD // Checking the existance of the TLD
FileSystem remoteFS = null; FileSystem remoteFS = null;
try { try {
@ -177,7 +175,7 @@ public class LogAggregationService extends AbstractService implements
remoteExists = remoteFS.exists(this.remoteRootLogDir); remoteExists = remoteFS.exists(this.remoteRootLogDir);
} catch (IOException e) { } catch (IOException e) {
throw new YarnException("Failed to check for existence of remoteLogDir [" throw new YarnException("Failed to check for existence of remoteLogDir ["
+ this.remoteRootLogDir + "]"); + this.remoteRootLogDir + "]", e);
} }
if (remoteExists) { if (remoteExists) {
try { try {
@ -191,8 +189,8 @@ public class LogAggregationService extends AbstractService implements
} }
} catch (IOException e) { } catch (IOException e) {
throw new YarnException( throw new YarnException(
"Failed while attempting to check permissions for dir [" "Failed to check permissions for dir ["
+ this.remoteRootLogDir + "]"); + this.remoteRootLogDir + "]", e);
} }
} else { } else {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
@ -208,7 +206,6 @@ public class LogAggregationService extends AbstractService implements
+ this.remoteRootLogDir + "]", e); + this.remoteRootLogDir + "]", e);
} }
} }
} }
Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) { Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
@ -296,6 +293,7 @@ public class LogAggregationService extends AbstractService implements
Map<ApplicationAccessType, String> appAcls) { Map<ApplicationAccessType, String> appAcls) {
ApplicationEvent eventResponse; ApplicationEvent eventResponse;
try { try {
verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls); initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
eventResponse = new ApplicationEvent(appId, eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);

View File

@ -44,6 +44,7 @@ import junit.framework.Assert;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataInputBuffer;
@ -79,7 +80,6 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mortbay.util.MultiException; import org.mortbay.util.MultiException;
@ -393,7 +394,76 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testLogAggregationInitFailsWithoutKillingNM() throws Exception { public void testVerifyAndCreateRemoteDirsFailure()
throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
logAggregationService.init(this.conf);
YarnException e = new YarnException("KABOOM!");
doThrow(e)
.when(logAggregationService).verifyAndCreateRemoteLogDir(
any(Configuration.class));
logAggregationService.start();
// Now try to start an application
ApplicationId appId = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
this.acls));
dispatcher.await();
// Verify that it failed
ApplicationEvent[] expectedEvents = new ApplicationEvent[] {
new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
};
checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic");
Mockito.reset(logAggregationService);
// Now try to start another one
ApplicationId appId2 = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
File appLogDir =
new File(localLogDir, ConverterUtils.toString(appId2));
appLogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(appId2,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
this.acls));
dispatcher.await();
// Verify that it worked
expectedEvents = new ApplicationEvent[] {
new ApplicationEvent(appId, // original failure
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
new ApplicationEvent(appId2, // success
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)
};
checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic");
logAggregationService.stop();
}
@Test
@SuppressWarnings("unchecked")
public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, this.conf.set(YarnConfiguration.NM_LOG_DIRS,
localLogDir.getAbsolutePath()); localLogDir.getAbsolutePath());