YARN-3850. NM fails to read files from full disks which can lead to container logs being lost and other issues. Contributed by Varun Saxena

This commit is contained in:
Jason Lowe 2015-06-26 15:47:07 +00:00
parent 8ef07f767f
commit 40b256949a
7 changed files with 93 additions and 19 deletions

View File

@ -716,6 +716,9 @@ Release 2.7.1 - UNRELEASED
YARN-3832. Resource Localization fails on a cluster due to existing cache
directories (Brahma Reddy Battula via jlowe)
YARN-3850. NM fails to read files from full disks which can lead to
container logs being lost and other issues (Varun Saxena via jlowe)
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES

View File

@ -237,6 +237,18 @@ public class LocalDirsHandlerService extends AbstractService {
return logDirs.getFullDirs();
}
/**
* Function to get the local dirs which should be considered for reading
* existing files on disk. Contains the good local dirs and the local dirs
* that have reached the disk space limit
*
* @return the local dirs which should be considered for reading
*/
public List<String> getLocalDirsForRead() {
return DirectoryCollection.concat(localDirs.getGoodDirs(),
localDirs.getFullDirs());
}
/**
* Function to get the local dirs which should be considered when cleaning up
* resources. Contains the good local dirs and the local dirs that have reached
@ -249,6 +261,18 @@ public class LocalDirsHandlerService extends AbstractService {
localDirs.getFullDirs());
}
/**
* Function to get the log dirs which should be considered for reading
* existing files on disk. Contains the good log dirs and the log dirs that
* have reached the disk space limit
*
* @return the log dirs which should be considered for reading
*/
public List<String> getLogDirsForRead() {
return DirectoryCollection.concat(logDirs.getGoodDirs(),
logDirs.getFullDirs());
}
/**
* Function to get the log dirs which should be considered when cleaning up
* resources. Contains the good log dirs and the log dirs that have reached

View File

@ -126,7 +126,8 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
private File locatePidFile(String appIdStr, String containerIdStr) {
String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
for (String dir : getContext().getLocalDirsHandler().getLocalDirs()) {
for (String dir : getContext().getLocalDirsHandler().
getLocalDirsForRead()) {
File pidFile = new File(dir, pidSubpath);
if (pidFile.exists()) {
return pidFile;

View File

@ -595,10 +595,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
boolean appFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
+ StringUtils.join(",", dirsHandler.getLogDirs()));
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
final LogKey logKey = new LogKey(containerId);
final LogValue logValue =
new LogValue(dirsHandler.getLogDirs(), containerId,
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta, appFinished);
try {

View File

@ -74,7 +74,7 @@ public class ContainerLogsUtils {
static List<File> getContainerLogDirs(ContainerId containerId,
LocalDirsHandlerService dirsHandler) throws YarnException {
List<String> logDirs = dirsHandler.getLogDirs();
List<String> logDirs = dirsHandler.getLogDirsForRead();
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
logDir = new File(logDir).toURI().getPath();

View File

@ -177,22 +177,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
dispatcher.close();
}
@Test
public void testLocalFileDeletionAfterUpload() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());
delSrvc = spy(delSrvc);
this.delSrvc.init(conf);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
private void verifyLocalFileDeletion(
LogAggregationService logAggregationService) throws Exception {
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
// AppLogDir should be created
@ -252,9 +241,46 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
};
checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
checkEvents(appEventHandler, expectedEvents, true, "getType",
"getApplicationID");
}
@Test
public void testLocalFileDeletionAfterUpload() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());
delSrvc = spy(delSrvc);
this.delSrvc.init(conf);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
verifyLocalFileDeletion(logAggregationService);
}
@Test
public void testLocalFileDeletionOnDiskFull() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());
delSrvc = spy(delSrvc);
this.delSrvc.init(conf);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
List<String> logDirs = super.dirsHandler.getLogDirs();
LocalDirsHandlerService dirsHandler = spy(super.dirsHandler);
// Simulate disk being full by returning no good log dirs but having a
// directory in full log dirs.
when(dirsHandler.getLogDirs()).thenReturn(new ArrayList<String>());
when(dirsHandler.getLogDirsForRead()).thenReturn(logDirs);
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
dirsHandler));
verifyLocalFileDeletion(logAggregationService);
}
@Test
public void testNoContainerOnNode() throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
@ -29,6 +30,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -122,6 +124,24 @@ public class TestContainerLogsPage {
Assert.assertNull(nmContext.getContainers().get(container1));
files = ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
Assert.assertTrue(!(files.get(0).toString().contains("file:")));
// Create a new context to check if correct container log dirs are fetched
// on full disk.
LocalDirsHandlerService dirsHandlerForFullDisk = spy(dirsHandler);
// good log dirs are empty and nm log dir is in the full log dir list.
when(dirsHandlerForFullDisk.getLogDirs()).
thenReturn(new ArrayList<String>());
when(dirsHandlerForFullDisk.getLogDirsForRead()).
thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
new ApplicationACLsManager(conf), new NMNullStateStoreService());
nmContext.getApplications().put(appId, app);
container.setState(ContainerState.RUNNING);
nmContext.getContainers().put(container1, container);
List<File> dirs =
ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
File containerLogDir = new File(absLogDir, appId + "/" + container1);
Assert.assertTrue(dirs.contains(containerLogDir));
}
@Test(timeout = 10000)
@ -231,7 +251,7 @@ public class TestContainerLogsPage {
LocalDirsHandlerService localDirs = mock(LocalDirsHandlerService.class);
List<String> logDirs = new ArrayList<String>();
logDirs.add("F:/nmlogs");
when(localDirs.getLogDirs()).thenReturn(logDirs);
when(localDirs.getLogDirsForRead()).thenReturn(logDirs);
ApplicationIdPBImpl appId = mock(ApplicationIdPBImpl.class);
when(appId.toString()).thenReturn("app_id_1");