YARN-3850. NM fails to read files from full disks which can lead to container logs being lost and other issues. Contributed by Varun Saxena
(cherry picked from commit40b256949a
) (cherry picked from commit0221d19f4e
) (cherry picked from commit 87d2204f28f192a964c04a5fa1e2e31644d74b59)
This commit is contained in:
parent
34739fc91e
commit
fe5877a49e
|
@ -159,6 +159,9 @@ Release 2.6.1 - UNRELEASED
|
|||
YARN-3832. Resource Localization fails on a cluster due to existing cache
|
||||
directories (Brahma Reddy Battula via jlowe)
|
||||
|
||||
YARN-3850. NM fails to read files from full disks which can lead to
|
||||
container logs being lost and other issues (Varun Saxena via jlowe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -212,6 +212,18 @@ public class LocalDirsHandlerService extends AbstractService {
|
|||
return logDirs.getFullDirs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the local dirs which should be considered for reading
|
||||
* existing files on disk. Contains the good local dirs and the local dirs
|
||||
* that have reached the disk space limit
|
||||
*
|
||||
* @return the local dirs which should be considered for reading
|
||||
*/
|
||||
public List<String> getLocalDirsForRead() {
|
||||
return DirectoryCollection.concat(localDirs.getGoodDirs(),
|
||||
localDirs.getFullDirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the local dirs which should be considered when cleaning up
|
||||
* resources. Contains the good local dirs and the local dirs that have reached
|
||||
|
@ -224,6 +236,18 @@ public class LocalDirsHandlerService extends AbstractService {
|
|||
localDirs.getFullDirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the log dirs which should be considered for reading
|
||||
* existing files on disk. Contains the good log dirs and the log dirs that
|
||||
* have reached the disk space limit
|
||||
*
|
||||
* @return the log dirs which should be considered for reading
|
||||
*/
|
||||
public List<String> getLogDirsForRead() {
|
||||
return DirectoryCollection.concat(logDirs.getGoodDirs(),
|
||||
logDirs.getFullDirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the log dirs which should be considered when cleaning up
|
||||
* resources. Contains the good log dirs and the log dirs that have reached
|
||||
|
|
|
@ -121,7 +121,8 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
|
|||
|
||||
private File locatePidFile(String appIdStr, String containerIdStr) {
|
||||
String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
|
||||
for (String dir : getContext().getLocalDirsHandler().getLocalDirs()) {
|
||||
for (String dir : getContext().getLocalDirsHandler().
|
||||
getLocalDirsForRead()) {
|
||||
File pidFile = new File(dir, pidSubpath);
|
||||
if (pidFile.exists()) {
|
||||
return pidFile;
|
||||
|
|
|
@ -524,10 +524,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
public Set<Path> doContainerLogAggregation(LogWriter writer) {
|
||||
LOG.info("Uploading logs for container " + containerId
|
||||
+ ". Current good log dirs are "
|
||||
+ StringUtils.join(",", dirsHandler.getLogDirs()));
|
||||
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
|
||||
final LogKey logKey = new LogKey(containerId);
|
||||
final LogValue logValue =
|
||||
new LogValue(dirsHandler.getLogDirs(), containerId,
|
||||
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
|
||||
userUgi.getShortUserName(), logAggregationContext,
|
||||
this.uploadedFileMeta);
|
||||
try {
|
||||
|
|
|
@ -74,7 +74,7 @@ public class ContainerLogsUtils {
|
|||
|
||||
static List<File> getContainerLogDirs(ContainerId containerId,
|
||||
LocalDirsHandlerService dirsHandler) throws YarnException {
|
||||
List<String> logDirs = dirsHandler.getLogDirs();
|
||||
List<String> logDirs = dirsHandler.getLogDirsForRead();
|
||||
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
|
||||
for (String logDir : logDirs) {
|
||||
logDir = new File(logDir).toURI().getPath();
|
||||
|
|
|
@ -172,22 +172,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
dispatcher.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileDeletionAfterUpload() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
delSrvc = spy(delSrvc);
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||
super.dirsHandler));
|
||||
private void verifyLocalFileDeletion(
|
||||
LogAggregationService logAggregationService) throws Exception {
|
||||
logAggregationService.init(this.conf);
|
||||
logAggregationService.start();
|
||||
|
||||
|
||||
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
|
||||
|
||||
// AppLogDir should be created
|
||||
|
@ -247,10 +236,47 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
|
||||
};
|
||||
|
||||
checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
|
||||
checkEvents(appEventHandler, expectedEvents, true, "getType",
|
||||
"getApplicationID");
|
||||
dispatcher.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileDeletionAfterUpload() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
delSrvc = spy(delSrvc);
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||
super.dirsHandler));
|
||||
verifyLocalFileDeletion(logAggregationService);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileDeletionOnDiskFull() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
delSrvc = spy(delSrvc);
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
List<String> logDirs = super.dirsHandler.getLogDirs();
|
||||
LocalDirsHandlerService dirsHandler = spy(super.dirsHandler);
|
||||
// Simulate disk being full by returning no good log dirs but having a
|
||||
// directory in full log dirs.
|
||||
when(dirsHandler.getLogDirs()).thenReturn(new ArrayList<String>());
|
||||
when(dirsHandler.getLogDirsForRead()).thenReturn(logDirs);
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||
dirsHandler));
|
||||
verifyLocalFileDeletion(logAggregationService);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNoContainerOnNode() throws Exception {
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
|||
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
|
@ -29,6 +30,7 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -115,6 +117,24 @@ public class TestContainerLogsPage {
|
|||
Assert.assertNull(nmContext.getContainers().get(container1));
|
||||
files = ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
|
||||
Assert.assertTrue(!(files.get(0).toString().contains("file:")));
|
||||
|
||||
// Create a new context to check if correct container log dirs are fetched
|
||||
// on full disk.
|
||||
LocalDirsHandlerService dirsHandlerForFullDisk = spy(dirsHandler);
|
||||
// good log dirs are empty and nm log dir is in the full log dir list.
|
||||
when(dirsHandlerForFullDisk.getLogDirs()).
|
||||
thenReturn(new ArrayList<String>());
|
||||
when(dirsHandlerForFullDisk.getLogDirsForRead()).
|
||||
thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
|
||||
nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
nmContext.getApplications().put(appId, app);
|
||||
container.setState(ContainerState.RUNNING);
|
||||
nmContext.getContainers().put(container1, container);
|
||||
List<File> dirs =
|
||||
ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
|
||||
File containerLogDir = new File(absLogDir, appId + "/" + container1);
|
||||
Assert.assertTrue(dirs.contains(containerLogDir));
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
|
@ -224,7 +244,7 @@ public class TestContainerLogsPage {
|
|||
LocalDirsHandlerService localDirs = mock(LocalDirsHandlerService.class);
|
||||
List<String> logDirs = new ArrayList<String>();
|
||||
logDirs.add("F:/nmlogs");
|
||||
when(localDirs.getLogDirs()).thenReturn(logDirs);
|
||||
when(localDirs.getLogDirsForRead()).thenReturn(logDirs);
|
||||
|
||||
ApplicationIdPBImpl appId = mock(ApplicationIdPBImpl.class);
|
||||
when(appId.toString()).thenReturn("app_id_1");
|
||||
|
|
Loading…
Reference in New Issue