YARN-24. Nodemanager fails to start if log aggregation enabled and namenode unavailable. (sandyr via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461891 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fc5fd80e9f
commit
967b62f5d2
|
@ -152,6 +152,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-209. Fix CapacityScheduler to trigger application-activation when
|
YARN-209. Fix CapacityScheduler to trigger application-activation when
|
||||||
the cluster capacity changes. (Zhijie Shen via vinodkv)
|
the cluster capacity changes. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
|
YARN-24. Nodemanager fails to start if log aggregation enabled and
|
||||||
|
namenode unavailable. (sandyr via tucu)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
|
@ -97,7 +96,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
|
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
|
||||||
|
|
||||||
private final ExecutorService threadPool;
|
private final ExecutorService threadPool;
|
||||||
|
|
||||||
public LogAggregationService(Dispatcher dispatcher, Context context,
|
public LogAggregationService(Dispatcher dispatcher, Context context,
|
||||||
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
|
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
|
||||||
super(LogAggregationService.class.getName());
|
super(LogAggregationService.class.getName());
|
||||||
|
@ -129,7 +128,6 @@ public class LogAggregationService extends AbstractService implements
|
||||||
// NodeId is only available during start, the following cannot be moved
|
// NodeId is only available during start, the following cannot be moved
|
||||||
// anywhere else.
|
// anywhere else.
|
||||||
this.nodeId = this.context.getNodeId();
|
this.nodeId = this.context.getNodeId();
|
||||||
verifyAndCreateRemoteLogDir(getConfig());
|
|
||||||
super.start();
|
super.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +162,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyAndCreateRemoteLogDir(Configuration conf) {
|
void verifyAndCreateRemoteLogDir(Configuration conf) {
|
||||||
// Checking the existance of the TLD
|
// Checking the existance of the TLD
|
||||||
FileSystem remoteFS = null;
|
FileSystem remoteFS = null;
|
||||||
try {
|
try {
|
||||||
|
@ -177,7 +175,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
remoteExists = remoteFS.exists(this.remoteRootLogDir);
|
remoteExists = remoteFS.exists(this.remoteRootLogDir);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnException("Failed to check for existence of remoteLogDir ["
|
throw new YarnException("Failed to check for existence of remoteLogDir ["
|
||||||
+ this.remoteRootLogDir + "]");
|
+ this.remoteRootLogDir + "]", e);
|
||||||
}
|
}
|
||||||
if (remoteExists) {
|
if (remoteExists) {
|
||||||
try {
|
try {
|
||||||
|
@ -191,8 +189,8 @@ public class LogAggregationService extends AbstractService implements
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new YarnException(
|
throw new YarnException(
|
||||||
"Failed while attempting to check permissions for dir ["
|
"Failed to check permissions for dir ["
|
||||||
+ this.remoteRootLogDir + "]");
|
+ this.remoteRootLogDir + "]", e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
|
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
|
||||||
|
@ -208,7 +206,6 @@ public class LogAggregationService extends AbstractService implements
|
||||||
+ this.remoteRootLogDir + "]", e);
|
+ this.remoteRootLogDir + "]", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
|
Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
|
||||||
|
@ -296,6 +293,7 @@ public class LogAggregationService extends AbstractService implements
|
||||||
Map<ApplicationAccessType, String> appAcls) {
|
Map<ApplicationAccessType, String> appAcls) {
|
||||||
ApplicationEvent eventResponse;
|
ApplicationEvent eventResponse;
|
||||||
try {
|
try {
|
||||||
|
verifyAndCreateRemoteLogDir(getConfig());
|
||||||
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
|
initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
|
||||||
eventResponse = new ApplicationEvent(appId,
|
eventResponse = new ApplicationEvent(appId,
|
||||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
|
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
|
||||||
|
|
|
@ -44,6 +44,7 @@ import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
@ -79,7 +80,6 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
|
@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mortbay.util.MultiException;
|
import org.mortbay.util.MultiException;
|
||||||
|
|
||||||
|
|
||||||
|
@ -393,7 +394,76 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
|
public void testVerifyAndCreateRemoteDirsFailure()
|
||||||
|
throws Exception {
|
||||||
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||||
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||||
|
this.remoteRootLogDir.getAbsolutePath());
|
||||||
|
|
||||||
|
DrainDispatcher dispatcher = createDispatcher();
|
||||||
|
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
|
||||||
|
dispatcher.register(ApplicationEventType.class, appEventHandler);
|
||||||
|
|
||||||
|
LogAggregationService logAggregationService = spy(
|
||||||
|
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||||
|
super.dirsHandler));
|
||||||
|
logAggregationService.init(this.conf);
|
||||||
|
|
||||||
|
YarnException e = new YarnException("KABOOM!");
|
||||||
|
doThrow(e)
|
||||||
|
.when(logAggregationService).verifyAndCreateRemoteLogDir(
|
||||||
|
any(Configuration.class));
|
||||||
|
|
||||||
|
logAggregationService.start();
|
||||||
|
|
||||||
|
// Now try to start an application
|
||||||
|
ApplicationId appId = BuilderUtils.newApplicationId(
|
||||||
|
System.currentTimeMillis(), (int)Math.random());
|
||||||
|
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
|
||||||
|
this.user, null,
|
||||||
|
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
|
||||||
|
this.acls));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Verify that it failed
|
||||||
|
ApplicationEvent[] expectedEvents = new ApplicationEvent[] {
|
||||||
|
new ApplicationEvent(appId,
|
||||||
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)
|
||||||
|
};
|
||||||
|
checkEvents(appEventHandler, expectedEvents, false,
|
||||||
|
"getType", "getApplicationID", "getDiagnostic");
|
||||||
|
|
||||||
|
Mockito.reset(logAggregationService);
|
||||||
|
|
||||||
|
// Now try to start another one
|
||||||
|
ApplicationId appId2 = BuilderUtils.newApplicationId(
|
||||||
|
System.currentTimeMillis(), (int)Math.random());
|
||||||
|
File appLogDir =
|
||||||
|
new File(localLogDir, ConverterUtils.toString(appId2));
|
||||||
|
appLogDir.mkdir();
|
||||||
|
|
||||||
|
logAggregationService.handle(new LogHandlerAppStartedEvent(appId2,
|
||||||
|
this.user, null,
|
||||||
|
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
|
||||||
|
this.acls));
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
// Verify that it worked
|
||||||
|
expectedEvents = new ApplicationEvent[] {
|
||||||
|
new ApplicationEvent(appId, // original failure
|
||||||
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
|
||||||
|
new ApplicationEvent(appId2, // success
|
||||||
|
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)
|
||||||
|
};
|
||||||
|
checkEvents(appEventHandler, expectedEvents, false,
|
||||||
|
"getType", "getApplicationID", "getDiagnostic");
|
||||||
|
|
||||||
|
logAggregationService.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
|
||||||
|
|
||||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS,
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS,
|
||||||
localLogDir.getAbsolutePath());
|
localLogDir.getAbsolutePath());
|
||||||
|
|
Loading…
Reference in New Issue