YARN-3850. NM fails to read files from full disks which can lead to container logs being lost and other issues. Contributed by Varun Saxena
(cherry picked from commit 40b256949ad6f6e0dbdd248f2d257b05899f4332)
This commit is contained in:
parent
55427fb66c
commit
0221d19f4e
@ -672,6 +672,9 @@ Release 2.7.1 - UNRELEASED
|
||||
YARN-3832. Resource Localization fails on a cluster due to existing cache
|
||||
directories (Brahma Reddy Battula via jlowe)
|
||||
|
||||
YARN-3850. NM fails to read files from full disks which can lead to
|
||||
container logs being lost and other issues (Varun Saxena via jlowe)
|
||||
|
||||
Release 2.7.0 - 2015-04-20
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -237,6 +237,18 @@ public List<String> getDiskFullLogDirs() {
|
||||
return logDirs.getFullDirs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the local dirs which should be considered for reading
|
||||
* existing files on disk. Contains the good local dirs and the local dirs
|
||||
* that have reached the disk space limit
|
||||
*
|
||||
* @return the local dirs which should be considered for reading
|
||||
*/
|
||||
public List<String> getLocalDirsForRead() {
|
||||
return DirectoryCollection.concat(localDirs.getGoodDirs(),
|
||||
localDirs.getFullDirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the local dirs which should be considered when cleaning up
|
||||
* resources. Contains the good local dirs and the local dirs that have reached
|
||||
@ -249,6 +261,18 @@ public List<String> getLocalDirsForCleanup() {
|
||||
localDirs.getFullDirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the log dirs which should be considered for reading
|
||||
* existing files on disk. Contains the good log dirs and the log dirs that
|
||||
* have reached the disk space limit
|
||||
*
|
||||
* @return the log dirs which should be considered for reading
|
||||
*/
|
||||
public List<String> getLogDirsForRead() {
|
||||
return DirectoryCollection.concat(logDirs.getGoodDirs(),
|
||||
logDirs.getFullDirs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to get the log dirs which should be considered when cleaning up
|
||||
* resources. Contains the good log dirs and the log dirs that have reached
|
||||
|
@ -126,7 +126,8 @@ public Integer call() {
|
||||
|
||||
private File locatePidFile(String appIdStr, String containerIdStr) {
|
||||
String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
|
||||
for (String dir : getContext().getLocalDirsHandler().getLocalDirs()) {
|
||||
for (String dir : getContext().getLocalDirsHandler().
|
||||
getLocalDirsForRead()) {
|
||||
File pidFile = new File(dir, pidSubpath);
|
||||
if (pidFile.exists()) {
|
||||
return pidFile;
|
||||
|
@ -595,10 +595,10 @@ public Set<Path> doContainerLogAggregation(LogWriter writer,
|
||||
boolean appFinished) {
|
||||
LOG.info("Uploading logs for container " + containerId
|
||||
+ ". Current good log dirs are "
|
||||
+ StringUtils.join(",", dirsHandler.getLogDirs()));
|
||||
+ StringUtils.join(",", dirsHandler.getLogDirsForRead()));
|
||||
final LogKey logKey = new LogKey(containerId);
|
||||
final LogValue logValue =
|
||||
new LogValue(dirsHandler.getLogDirs(), containerId,
|
||||
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
|
||||
userUgi.getShortUserName(), logAggregationContext,
|
||||
this.uploadedFileMeta, appFinished);
|
||||
try {
|
||||
|
@ -74,7 +74,7 @@ public static List<File> getContainerLogDirs(ContainerId containerId,
|
||||
|
||||
static List<File> getContainerLogDirs(ContainerId containerId,
|
||||
LocalDirsHandlerService dirsHandler) throws YarnException {
|
||||
List<String> logDirs = dirsHandler.getLogDirs();
|
||||
List<String> logDirs = dirsHandler.getLogDirsForRead();
|
||||
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
|
||||
for (String logDir : logDirs) {
|
||||
logDir = new File(logDir).toURI().getPath();
|
||||
|
@ -177,22 +177,11 @@ public void tearDown() throws IOException, InterruptedException {
|
||||
dispatcher.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileDeletionAfterUpload() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
delSrvc = spy(delSrvc);
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||
super.dirsHandler));
|
||||
private void verifyLocalFileDeletion(
|
||||
LogAggregationService logAggregationService) throws Exception {
|
||||
logAggregationService.init(this.conf);
|
||||
logAggregationService.start();
|
||||
|
||||
|
||||
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
|
||||
|
||||
// AppLogDir should be created
|
||||
@ -252,9 +241,46 @@ public void testLocalFileDeletionAfterUpload() throws Exception {
|
||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
|
||||
};
|
||||
|
||||
checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
|
||||
checkEvents(appEventHandler, expectedEvents, true, "getType",
|
||||
"getApplicationID");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileDeletionAfterUpload() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
delSrvc = spy(delSrvc);
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||
super.dirsHandler));
|
||||
verifyLocalFileDeletion(logAggregationService);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalFileDeletionOnDiskFull() throws Exception {
|
||||
this.delSrvc = new DeletionService(createContainerExecutor());
|
||||
delSrvc = spy(delSrvc);
|
||||
this.delSrvc.init(conf);
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
this.remoteRootLogDir.getAbsolutePath());
|
||||
List<String> logDirs = super.dirsHandler.getLogDirs();
|
||||
LocalDirsHandlerService dirsHandler = spy(super.dirsHandler);
|
||||
// Simulate disk being full by returning no good log dirs but having a
|
||||
// directory in full log dirs.
|
||||
when(dirsHandler.getLogDirs()).thenReturn(new ArrayList<String>());
|
||||
when(dirsHandler.getLogDirsForRead()).thenReturn(logDirs);
|
||||
LogAggregationService logAggregationService = spy(
|
||||
new LogAggregationService(dispatcher, this.context, this.delSrvc,
|
||||
dirsHandler));
|
||||
verifyLocalFileDeletion(logAggregationService);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNoContainerOnNode() throws Exception {
|
||||
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@ -29,6 +30,7 @@
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -122,6 +124,24 @@ public void testContainerLogDirs() throws IOException, YarnException {
|
||||
Assert.assertNull(nmContext.getContainers().get(container1));
|
||||
files = ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
|
||||
Assert.assertTrue(!(files.get(0).toString().contains("file:")));
|
||||
|
||||
// Create a new context to check if correct container log dirs are fetched
|
||||
// on full disk.
|
||||
LocalDirsHandlerService dirsHandlerForFullDisk = spy(dirsHandler);
|
||||
// good log dirs are empty and nm log dir is in the full log dir list.
|
||||
when(dirsHandlerForFullDisk.getLogDirs()).
|
||||
thenReturn(new ArrayList<String>());
|
||||
when(dirsHandlerForFullDisk.getLogDirsForRead()).
|
||||
thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
|
||||
nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
|
||||
new ApplicationACLsManager(conf), new NMNullStateStoreService());
|
||||
nmContext.getApplications().put(appId, app);
|
||||
container.setState(ContainerState.RUNNING);
|
||||
nmContext.getContainers().put(container1, container);
|
||||
List<File> dirs =
|
||||
ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
|
||||
File containerLogDir = new File(absLogDir, appId + "/" + container1);
|
||||
Assert.assertTrue(dirs.contains(containerLogDir));
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@ -231,7 +251,7 @@ public void testLogDirWithDriveLetter() throws Exception {
|
||||
LocalDirsHandlerService localDirs = mock(LocalDirsHandlerService.class);
|
||||
List<String> logDirs = new ArrayList<String>();
|
||||
logDirs.add("F:/nmlogs");
|
||||
when(localDirs.getLogDirs()).thenReturn(logDirs);
|
||||
when(localDirs.getLogDirsForRead()).thenReturn(logDirs);
|
||||
|
||||
ApplicationIdPBImpl appId = mock(ApplicationIdPBImpl.class);
|
||||
when(appId.toString()).thenReturn("app_id_1");
|
||||
|
Loading…
x
Reference in New Issue
Block a user