From 6f2028bd1514d90b831f889fd0ee7f2ba5c15000 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 21 Oct 2014 17:29:22 +0000 Subject: [PATCH] YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev --- hadoop-yarn-project/CHANGES.txt | 3 + .../nodemanager/DirectoryCollection.java | 221 +++++++--- .../nodemanager/LocalDirsHandlerService.java | 180 +++++++-- .../nodemanager/NodeHealthCheckerService.java | 4 +- .../launcher/ContainerLaunch.java | 2 +- .../ResourceLocalizationService.java | 270 ++++++++++--- .../logaggregation/AppLogAggregatorImpl.java | 35 +- .../logaggregation/LogAggregationService.java | 13 +- .../loghandler/NonAggregatingLogHandler.java | 47 ++- .../nodemanager/TestDirectoryCollection.java | 75 +++- .../TestLocalDirsHandlerService.java | 43 +- .../nodemanager/TestNodeHealthService.java | 2 +- .../TestResourceLocalizationService.java | 312 +++++++++++++- .../TestLogAggregationService.java | 147 ++++--- .../TestNonAggregatingLogHandler.java | 382 ++++++++++++++---- 15 files changed, 1444 insertions(+), 292 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b130ecfbf12..af056b36cd8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -379,6 +379,9 @@ Release 2.6.0 - UNRELEASED YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. (Xuan Gong via zjshen) + YARN-90. NodeManager should identify failed disks becoming good again + (Varun Vasudev via jlowe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index f6ee1289927..279787bdf9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -21,18 +21,23 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; /** * Manages a list of local storage directories. @@ -40,9 +45,38 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException; class DirectoryCollection { private static final Log LOG = LogFactory.getLog(DirectoryCollection.class); + public enum DiskErrorCause { + DISK_FULL, OTHER + } + + static class DiskErrorInformation { + DiskErrorCause cause; + String message; + + DiskErrorInformation(DiskErrorCause cause, String message) { + this.cause = cause; + this.message = message; + } + } + + /** + * Returns a merged list which contains all the elements of l1 and l2 + * @param l1 the first list to be included + * @param l2 the second list to be included + * @return a new list containing all the elements of the first and second list + */ + static List concat(List l1, List l2) { + List ret = new ArrayList(l1.size() + l2.size()); + ret.addAll(l1); + ret.addAll(l2); + return ret; + } + // Good local storage directories private List localDirs; - private List failedDirs; + private List errorDirs; + private List fullDirs; + private int numFailures; private float diskUtilizationPercentageCutoff; @@ -109,7 +143,9 @@ class DirectoryCollection { float utilizationPercentageCutOff, long utilizationSpaceCutOff) { localDirs = new CopyOnWriteArrayList(dirs); - failedDirs = new CopyOnWriteArrayList(); + errorDirs = new CopyOnWriteArrayList(); + fullDirs = new CopyOnWriteArrayList(); + diskUtilizationPercentageCutoff = utilizationPercentageCutOff; diskUtilizationSpaceCutoff = utilizationSpaceCutOff; diskUtilizationPercentageCutoff = @@ -131,7 +167,16 @@ class DirectoryCollection { * @return the failed directories */ synchronized List getFailedDirs() { - return Collections.unmodifiableList(failedDirs); + return Collections.unmodifiableList( + DirectoryCollection.concat(errorDirs, fullDirs)); + } + + /** + * @return the directories that have used all disk space + */ + + synchronized List getFullDirs() { + return fullDirs; } /** @@ -158,7 +203,7 @@ class DirectoryCollection { LOG.warn("Unable to create directory " + dir + " error " + e.getMessage() + ", removing from the list of valid directories."); localDirs.remove(dir); - failedDirs.add(dir); + errorDirs.add(dir); numFailures++; failed = true; } @@ -167,61 +212,147 @@ class DirectoryCollection { } /** - * Check the health of current set of local directories, updating the list - * of valid directories if necessary. - * @return true if there is a new disk-failure identified in - * this checking. false otherwise. + * Check the health of current set of local directories(good and failed), + * updating the list of valid directories if necessary. + * + * @return true if there is a new disk-failure identified in this + * checking or a failed directory passes the disk check false + * otherwise. */ synchronized boolean checkDirs() { - int oldNumFailures = numFailures; - HashSet checkFailedDirs = new HashSet(); - for (final String dir : localDirs) { + boolean setChanged = false; + Set preCheckGoodDirs = new HashSet(localDirs); + Set preCheckFullDirs = new HashSet(fullDirs); + Set preCheckOtherErrorDirs = new HashSet(errorDirs); + List failedDirs = DirectoryCollection.concat(errorDirs, fullDirs); + List allLocalDirs = + DirectoryCollection.concat(localDirs, failedDirs); + + Map dirsFailedCheck = testDirs(allLocalDirs); + + localDirs.clear(); + errorDirs.clear(); + fullDirs.clear(); + + for (Map.Entry entry : dirsFailedCheck + .entrySet()) { + String dir = entry.getKey(); + DiskErrorInformation errorInformation = entry.getValue(); + switch (entry.getValue().cause) { + case DISK_FULL: + fullDirs.add(entry.getKey()); + break; + case OTHER: + errorDirs.add(entry.getKey()); + break; + } + if (preCheckGoodDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error, " + errorInformation.message + + ", removing from list of valid directories"); + setChanged = true; + numFailures++; + } + } + for (String dir : allLocalDirs) { + if (!dirsFailedCheck.containsKey(dir)) { + localDirs.add(dir); + if (preCheckFullDirs.contains(dir) + || preCheckOtherErrorDirs.contains(dir)) { + setChanged = true; + LOG.info("Directory " + dir + + " passed disk check, adding to list of valid directories."); + } + } + } + Set postCheckFullDirs = new HashSet(fullDirs); + Set postCheckOtherDirs = new HashSet(errorDirs); + for (String dir : preCheckFullDirs) { + if (postCheckOtherDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } + } + + for (String dir : preCheckOtherErrorDirs) { + if (postCheckFullDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } + } + return setChanged; + } + + Map testDirs(List dirs) { + HashMap ret = + new HashMap(); + for (final String dir : dirs) { + String msg; try { File testDir = new File(dir); DiskChecker.checkDir(testDir); - if (isDiskUsageUnderPercentageLimit(testDir)) { - LOG.warn("Directory " + dir - + " error, used space above threshold of " - + diskUtilizationPercentageCutoff - + "%, removing from the list of valid directories."); - checkFailedDirs.add(dir); - } else if (isDiskFreeSpaceWithinLimit(testDir)) { - LOG.warn("Directory " + dir + " error, free space below limit of " - + diskUtilizationSpaceCutoff - + "MB, removing from the list of valid directories."); - checkFailedDirs.add(dir); + if (isDiskUsageOverPercentageLimit(testDir)) { + msg = + "used space above threshold of " + + diskUtilizationPercentageCutoff + + "%"; + ret.put(dir, + new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg)); + continue; + } else if (isDiskFreeSpaceUnderLimit(testDir)) { + msg = + "free space below limit of " + diskUtilizationSpaceCutoff + + "MB"; + ret.put(dir, + new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg)); + continue; } - } catch (DiskErrorException de) { - LOG.warn("Directory " + dir + " error " + de.getMessage() - + ", removing from the list of valid directories."); - checkFailedDirs.add(dir); + + // create a random dir to make sure fs isn't in read-only mode + verifyDirUsingMkdir(testDir); + } catch (IOException ie) { + ret.put(dir, + new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage())); } } - for (String dir : checkFailedDirs) { - localDirs.remove(dir); - failedDirs.add(dir); - numFailures++; - } - return numFailures > oldNumFailures; + return ret; } - - private boolean isDiskUsageUnderPercentageLimit(File dir) { + + /** + * Function to test whether a dir is working correctly by actually creating a + * random directory. + * + * @param dir + * the dir to test + */ + private void verifyDirUsingMkdir(File dir) throws IOException { + + String randomDirName = RandomStringUtils.randomAlphanumeric(5); + File target = new File(dir, randomDirName); + int i = 0; + while (target.exists()) { + + randomDirName = RandomStringUtils.randomAlphanumeric(5) + i; + target = new File(dir, randomDirName); + i++; + } + try { + DiskChecker.checkDir(target); + } finally { + FileUtils.deleteQuietly(target); + } + } + + private boolean isDiskUsageOverPercentageLimit(File dir) { float freePercentage = 100 * (dir.getUsableSpace() / (float) dir.getTotalSpace()); float usedPercentage = 100.0F - freePercentage; - if (usedPercentage > diskUtilizationPercentageCutoff - || usedPercentage >= 100.0F) { - return true; - } - return false; + return (usedPercentage > diskUtilizationPercentageCutoff + || usedPercentage >= 100.0F); } - private boolean isDiskFreeSpaceWithinLimit(File dir) { + private boolean isDiskFreeSpaceUnderLimit(File dir) { long freeSpace = dir.getUsableSpace() / (1024 * 1024); - if (freeSpace < this.diskUtilizationSpaceCutoff) { - return true; - } - return false; + return freeSpace < this.diskUtilizationSpaceCutoff; } private void createDir(FileContext localFs, Path dir, FsPermission perm) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index b0539414de5..7d1aa534bef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -150,7 +152,7 @@ public class LocalDirsHandlerService extends AbstractService { boolean createSucceeded = localDirs.createNonExistentDirs(localFs, perm); createSucceeded &= logDirs.createNonExistentDirs(localFs, perm); if (!createSucceeded) { - updateDirsAfterFailure(); + updateDirsAfterTest(); } // Check the disk health immediately to weed out bad directories @@ -197,9 +199,52 @@ public class LocalDirsHandlerService extends AbstractService { } /** + * @return the local directories which have no disk space + */ + public List getDiskFullLocalDirs() { + return localDirs.getFullDirs(); + } + + /** + * @return the log directories that have no disk space + */ + public List getDiskFullLogDirs() { + return logDirs.getFullDirs(); + } + + /** + * Function to get the local dirs which should be considered when cleaning up + * resources. Contains the good local dirs and the local dirs that have reached + * the disk space limit + * + * @return the local dirs which should be considered for cleaning up + */ + public List getLocalDirsForCleanup() { + return DirectoryCollection.concat(localDirs.getGoodDirs(), + localDirs.getFullDirs()); + } + + /** + * Function to get the log dirs which should be considered when cleaning up + * resources. Contains the good log dirs and the log dirs that have reached + * the disk space limit + * + * @return the log dirs which should be considered for cleaning up + */ + public List getLogDirsForCleanup() { + return DirectoryCollection.concat(logDirs.getGoodDirs(), + logDirs.getFullDirs()); + } + + /** + * Function to generate a report on the state of the disks. + * + * @param listGoodDirs + * flag to determine whether the report should report the state of + * good dirs or failed dirs * @return the health report of nm-local-dirs and nm-log-dirs */ - public String getDisksHealthReport() { + public String getDisksHealthReport(boolean listGoodDirs) { if (!isDiskHealthCheckerEnabled) { return ""; } @@ -207,20 +252,31 @@ public class LocalDirsHandlerService extends AbstractService { StringBuilder report = new StringBuilder(); List failedLocalDirsList = localDirs.getFailedDirs(); List failedLogDirsList = logDirs.getFailedDirs(); - int numLocalDirs = localDirs.getGoodDirs().size() - + failedLocalDirsList.size(); - int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size(); - if (!failedLocalDirsList.isEmpty()) { - report.append(failedLocalDirsList.size() + "/" + numLocalDirs - + " local-dirs turned bad: " - + StringUtils.join(",", failedLocalDirsList) + ";"); - } - if (!failedLogDirsList.isEmpty()) { - report.append(failedLogDirsList.size() + "/" + numLogDirs - + " log-dirs turned bad: " - + StringUtils.join(",", failedLogDirsList)); + List goodLocalDirsList = localDirs.getGoodDirs(); + List goodLogDirsList = logDirs.getGoodDirs(); + int numLocalDirs = goodLocalDirsList.size() + failedLocalDirsList.size(); + int numLogDirs = goodLogDirsList.size() + failedLogDirsList.size(); + if (!listGoodDirs) { + if (!failedLocalDirsList.isEmpty()) { + report.append(failedLocalDirsList.size() + "/" + numLocalDirs + + " local-dirs are bad: " + + StringUtils.join(",", failedLocalDirsList) + "; "); + } + if (!failedLogDirsList.isEmpty()) { + report.append(failedLogDirsList.size() + "/" + numLogDirs + + " log-dirs are bad: " + StringUtils.join(",", failedLogDirsList)); + } + } else { + report.append(goodLocalDirsList.size() + "/" + numLocalDirs + + " local-dirs are good: " + StringUtils.join(",", goodLocalDirsList) + + "; "); + report.append(goodLogDirsList.size() + "/" + numLogDirs + + " log-dirs are good: " + StringUtils.join(",", goodLogDirsList)); + } + return report.toString(); + } /** @@ -262,8 +318,8 @@ public class LocalDirsHandlerService extends AbstractService { * Set good local dirs and good log dirs in the configuration so that the * LocalDirAllocator objects will use this updated configuration only. */ - private void updateDirsAfterFailure() { - LOG.info("Disk(s) failed. " + getDisksHealthReport()); + private void updateDirsAfterTest() { + Configuration conf = getConfig(); List localDirs = getLocalDirs(); conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, @@ -273,23 +329,91 @@ public class LocalDirsHandlerService extends AbstractService { logDirs.toArray(new String[logDirs.size()])); if (!areDisksHealthy()) { // Just log. - LOG.error("Most of the disks failed. " + getDisksHealthReport()); + LOG.error("Most of the disks failed. " + getDisksHealthReport(false)); } } - private void checkDirs() { - boolean newFailure = false; - if (localDirs.checkDirs()) { - newFailure = true; - } - if (logDirs.checkDirs()) { - newFailure = true; - } + private void logDiskStatus(boolean newDiskFailure, boolean diskTurnedGood) { + if (newDiskFailure) { + String report = getDisksHealthReport(false); + LOG.info("Disk(s) failed: " + report); + } + if (diskTurnedGood) { + String report = getDisksHealthReport(true); + LOG.info("Disk(s) turned good: " + report); + } - if (newFailure) { - updateDirsAfterFailure(); + } + + private void checkDirs() { + boolean disksStatusChange = false; + Set failedLocalDirsPreCheck = + new HashSet(localDirs.getFailedDirs()); + Set failedLogDirsPreCheck = + new HashSet(logDirs.getFailedDirs()); + + if (localDirs.checkDirs()) { + disksStatusChange = true; + } + if (logDirs.checkDirs()) { + disksStatusChange = true; + } + + Set failedLocalDirsPostCheck = + new HashSet(localDirs.getFailedDirs()); + Set failedLogDirsPostCheck = + new HashSet(logDirs.getFailedDirs()); + + boolean disksFailed = false; + boolean disksTurnedGood = false; + + disksFailed = + disksTurnedBad(failedLocalDirsPreCheck, failedLocalDirsPostCheck); + disksTurnedGood = + disksTurnedGood(failedLocalDirsPreCheck, failedLocalDirsPostCheck); + + // skip check if we have new failed or good local dirs since we're going to + // log anyway + if (!disksFailed) { + disksFailed = + disksTurnedBad(failedLogDirsPreCheck, failedLogDirsPostCheck); + } + if (!disksTurnedGood) { + disksTurnedGood = + disksTurnedGood(failedLogDirsPreCheck, failedLogDirsPostCheck); + } + + logDiskStatus(disksFailed, disksTurnedGood); + + if (disksStatusChange) { + updateDirsAfterTest(); + } + + lastDisksCheckTime = System.currentTimeMillis(); + } + + private boolean disksTurnedBad(Set preCheckFailedDirs, + Set postCheckDirs) { + boolean disksFailed = false; + for (String dir : postCheckDirs) { + if (!preCheckFailedDirs.contains(dir)) { + disksFailed = true; + break; } - lastDisksCheckTime = System.currentTimeMillis(); + } + return disksFailed; + } + + private boolean disksTurnedGood(Set preCheckDirs, + Set postCheckDirs) { + boolean disksTurnedGood = false; + for (String dir : preCheckDirs) { + if (!postCheckDirs.contains(dir)) { + disksTurnedGood = true; + break; + } + } + return disksTurnedGood; } public Path getLocalPathForWrite(String pathStr) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java index 446d05cb418..6d6001a1d2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java @@ -55,9 +55,9 @@ public class NodeHealthCheckerService extends CompositeService { String scriptReport = (nodeHealthScriptRunner == null) ? "" : nodeHealthScriptRunner.getHealthReport(); if (scriptReport.equals("")) { - return dirsHandler.getDisksHealthReport(); + return dirsHandler.getDisksHealthReport(false); } else { - return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport()); + return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport(false)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 87a36c44126..f87ed6a1252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -240,7 +240,7 @@ public class ContainerLaunch implements Callable { if (!dirsHandler.areDisksHealthy()) { ret = ContainerExitStatus.DISKS_FAILED; throw new IOException("Most of the disks failed. " - + dirsHandler.getDisksHealthReport()); + + dirsHandler.getDisksHealthReport(false)); } try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index d3b33e8b281..371684b71dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -55,11 +55,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; @@ -170,6 +172,8 @@ public class ResourceLocalizationService extends CompositeService */ private final ConcurrentMap appRsrc = new ConcurrentHashMap(); + + FileContext lfs; public ResourceLocalizationService(Dispatcher dispatcher, ContainerExecutor exec, DeletionService delService, @@ -219,32 +223,17 @@ public class ResourceLocalizationService extends CompositeService this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { - FileContext lfs = getLocalFileContext(conf); - lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); + lfs = getLocalFileContext(conf); + lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK)); - if (!stateStore.canRecover() || stateStore.isNewlyCreated()) { - cleanUpLocalDir(lfs,delService); + if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) { + cleanUpLocalDirs(lfs, delService); + initializeLocalDirs(lfs); + initializeLogDirs(lfs); } - - List localDirs = dirsHandler.getLocalDirs(); - for (String localDir : localDirs) { - // $local/usercache - Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE); - lfs.mkdir(userDir, null, true); - // $local/filecache - Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE); - lfs.mkdir(fileDir, null, true); - // $local/nmPrivate - Path sysDir = new Path(localDir, NM_PRIVATE_DIR); - lfs.mkdir(sysDir, NM_PRIVATE_PERM, true); - } - - List logDirs = dirsHandler.getLogDirs(); - for (String logDir : logDirs) { - lfs.mkdir(new Path(logDir), null, true); - } - } catch (IOException e) { - throw new YarnRuntimeException("Failed to initialize LocalizationService", e); + } catch (Exception e) { + throw new YarnRuntimeException( + "Failed to initialize LocalizationService", e); } cacheTargetSize = @@ -497,28 +486,45 @@ public class ResourceLocalizationService extends CompositeService String containerIDStr = c.toString(); String appIDStr = ConverterUtils.toString( c.getContainerId().getApplicationAttemptId().getApplicationId()); - for (String localDir : dirsHandler.getLocalDirs()) { + + // Try deleting from good local dirs and full local dirs because a dir might + // have gone bad while the app was running(disk full). In addition + // a dir might have become good while the app was running. + // Check if the container dir exists and if it does, try to delete it + for (String localDir : dirsHandler.getLocalDirsForCleanup()) { // Delete the user-owned container-dir Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, userName); Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); Path containerDir = new Path(appDir, containerIDStr); - delService.delete(userName, containerDir, new Path[] {}); + submitDirForDeletion(userName, containerDir); // Delete the nmPrivate container-dir - + Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); Path containerSysDir = new Path(appSysDir, containerIDStr); - delService.delete(null, containerSysDir, new Path[] {}); + submitDirForDeletion(null, containerSysDir); } dispatcher.getEventHandler().handle( new ContainerEvent(c.getContainerId(), ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); } + + private void submitDirForDeletion(String userName, Path dir) { + try { + lfs.getFileStatus(dir); + delService.delete(userName, dir, new Path[] {}); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue); + } catch (IOException ie) { + // ignore + return; + } + } @SuppressWarnings({"unchecked"}) @@ -545,19 +551,22 @@ public class ResourceLocalizationService extends CompositeService } // Delete the application directories - for (String localDir : dirsHandler.getLocalDirs()) { + userName = application.getUser(); + appIDStr = application.toString(); + + for (String localDir : dirsHandler.getLocalDirsForCleanup()) { // Delete the user-owned app-dir Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, userName); Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); - delService.delete(userName, appDir, new Path[] {}); + submitDirForDeletion(userName, appDir); // Delete the nmPrivate app-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); - delService.delete(null, appSysDir, new Path[] {}); + submitDirForDeletion(null, appSysDir); } // TODO: decrement reference counts of all resources associated with this @@ -590,8 +599,8 @@ public class ResourceLocalizationService extends CompositeService private String getAppFileCachePath(String user, String appId) { return StringUtils.join(Path.SEPARATOR, Arrays.asList(".", - ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId, - ContainerLocalizer.FILECACHE)); + ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId, + ContainerLocalizer.FILECACHE)); } @VisibleForTesting @@ -868,7 +877,7 @@ public class ResourceLocalizationService extends CompositeService /** * Find next resource to be given to a spawned localizer. * - * @return + * @return the next resource to be localized */ private LocalResource findNextResource() { synchronized (pending) { @@ -1071,8 +1080,8 @@ public class ResourceLocalizationService extends CompositeService // 1) write credentials to private dir writeCredentials(nmPrivateCTokensPath); // 2) exec initApplication and wait - List localDirs = dirsHandler.getLocalDirs(); - List logDirs = dirsHandler.getLogDirs(); + List localDirs = getInitializedLocalDirs(); + List logDirs = getInitializedLogDirs(); if (dirsHandler.areDisksHealthy()) { exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, context.getUser(), @@ -1082,7 +1091,7 @@ public class ResourceLocalizationService extends CompositeService localizerId, localDirs, logDirs); } else { throw new IOException("All disks failed. " - + dirsHandler.getDisksHealthReport()); + + dirsHandler.getDisksHealthReport(false)); } // TODO handle ExitCodeException separately? } catch (Exception e) { @@ -1151,21 +1160,92 @@ public class ResourceLocalizationService extends CompositeService } - private void cleanUpLocalDir(FileContext lfs, DeletionService del) { - long currentTimeStamp = System.currentTimeMillis(); - for (String localDir : dirsHandler.getLocalDirs()) { - renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE, - currentTimeStamp); - renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE, - currentTimeStamp); - renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, - currentTimeStamp); + private void initializeLocalDirs(FileContext lfs) { + List localDirs = dirsHandler.getLocalDirs(); + for (String localDir : localDirs) { + initializeLocalDir(lfs, localDir); + } + } + + private void initializeLocalDir(FileContext lfs, String localDir) { + + Map pathPermissionMap = getLocalDirsPathPermissionsMap(localDir); + for (Map.Entry entry : pathPermissionMap.entrySet()) { + FileStatus status; try { - deleteLocalDir(lfs, del, localDir); - } catch (IOException e) { - // Do nothing, just give the warning - LOG.warn("Failed to delete localDir: " + localDir); + status = lfs.getFileStatus(entry.getKey()); } + catch(FileNotFoundException fs) { + status = null; + } + catch(IOException ie) { + String msg = "Could not get file status for local dir " + entry.getKey(); + LOG.warn(msg, ie); + throw new YarnRuntimeException(msg, ie); + } + if(status == null) { + try { + lfs.mkdir(entry.getKey(), entry.getValue(), true); + status = lfs.getFileStatus(entry.getKey()); + } catch (IOException e) { + String msg = "Could not initialize local dir " + entry.getKey(); + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + FsPermission perms = status.getPermission(); + if(!perms.equals(entry.getValue())) { + try { + lfs.setPermission(entry.getKey(), entry.getValue()); + } + catch(IOException ie) { + String msg = "Could not set permissions for local dir " + entry.getKey(); + LOG.warn(msg, ie); + throw new YarnRuntimeException(msg, ie); + } + } + } + } + + private void initializeLogDirs(FileContext lfs) { + List logDirs = dirsHandler.getLogDirs(); + for (String logDir : logDirs) { + initializeLogDir(lfs, logDir); + } + } + + private void initializeLogDir(FileContext lfs, String logDir) { + try { + lfs.mkdir(new Path(logDir), null, true); + } catch (FileAlreadyExistsException fe) { + // do nothing + } catch (IOException e) { + String msg = "Could not initialize log dir " + logDir; + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + + private void cleanUpLocalDirs(FileContext lfs, DeletionService del) { + for (String localDir : dirsHandler.getLocalDirs()) { + cleanUpLocalDir(lfs, del, localDir); + } + } + + private void cleanUpLocalDir(FileContext lfs, DeletionService del, + String localDir) { + long currentTimeStamp = System.currentTimeMillis(); + renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE, + currentTimeStamp); + renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE, + currentTimeStamp); + renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, + currentTimeStamp); + try { + deleteLocalDir(lfs, del, localDir); + } catch (IOException e) { + // Do nothing, just give the warning + LOG.warn("Failed to delete localDir: " + localDir); } } @@ -1234,5 +1314,95 @@ public class ResourceLocalizationService extends CompositeService del.scheduleFileDeletionTask(dependentDeletionTask); } } + + /** + * Synchronized method to get a list of initialized local dirs. Method will + * check each local dir to ensure it has been setup correctly and will attempt + * to fix any issues it finds. + * + * @return list of initialized local dirs + */ + synchronized private List getInitializedLocalDirs() { + List dirs = dirsHandler.getLocalDirs(); + List checkFailedDirs = new ArrayList(); + for (String dir : dirs) { + try { + checkLocalDir(dir); + } catch (YarnRuntimeException e) { + checkFailedDirs.add(dir); + } + } + for (String dir : checkFailedDirs) { + LOG.info("Attempting to initialize " + dir); + initializeLocalDir(lfs, dir); + try { + checkLocalDir(dir); + } catch (YarnRuntimeException e) { + String msg = + "Failed to setup local dir " + dir + ", which was marked as good."; + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + return dirs; + } + private boolean checkLocalDir(String localDir) { + + Map pathPermissionMap = getLocalDirsPathPermissionsMap(localDir); + + for (Map.Entry entry : pathPermissionMap.entrySet()) { + FileStatus status; + try { + status = lfs.getFileStatus(entry.getKey()); + } catch (Exception e) { + String msg = + "Could not carry out resource dir checks for " + localDir + + ", which was marked as good"; + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + + if (!status.getPermission().equals(entry.getValue())) { + String msg = + "Permissions incorrectly set for dir " + entry.getKey() + + ", should be " + entry.getValue() + ", actual value = " + + status.getPermission(); + LOG.warn(msg); + throw new YarnRuntimeException(msg); + } + } + return true; + } + + private Map getLocalDirsPathPermissionsMap(String localDir) { + Map localDirPathFsPermissionsMap = new HashMap(); + + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPrivatePermission = + NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + + Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE); + Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE); + Path sysDir = new Path(localDir, NM_PRIVATE_DIR); + + localDirPathFsPermissionsMap.put(userDir, defaultPermission); + localDirPathFsPermissionsMap.put(fileDir, defaultPermission); + localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); + return localDirPathFsPermissionsMap; + } + + /** + * Synchronized method to get a list of initialized log dirs. Method will + * check each local dir to ensure it has been setup correctly and will attempt + * to fix any issues it finds. + * + * @return list of initialized log dirs + */ + synchronized private List getInitializedLogDirs() { + List dirs = dirsHandler.getLogDirs(); + initializeLogDirs(lfs); + return dirs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 6e196bbbfe0..43cd7b5012d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -37,9 +38,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -107,6 +110,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final AtomicBoolean aborted = new AtomicBoolean(); private final Map appAcls; + private final FileContext lfs; private final LogAggregationContext logAggregationContext; private final Context context; private final int retentionSize; @@ -122,8 +126,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, Map appAcls, - LogAggregationContext logAggregationContext, - Context context) { + LogAggregationContext logAggregationContext, Context context, + FileContext lfs) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -136,6 +140,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; + this.lfs = lfs; this.logAggregationContext = logAggregationContext; this.context = context; this.nodeId = nodeId; @@ -395,15 +400,25 @@ public class AppLogAggregatorImpl implements AppLogAggregator { uploadLogsForContainers(); // Remove the local app-log-dirs - List rootLogDirs = dirsHandler.getLogDirs(); - Path[] localAppLogDirs = new Path[rootLogDirs.size()]; - int index = 0; - for (String rootLogDir : rootLogDirs) { - localAppLogDirs[index] = new Path(rootLogDir, this.applicationId); - index++; + List localAppLogDirs = new ArrayList(); + for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { + Path logPath = new Path(rootLogDir, applicationId); + try { + // check if log dir exists + lfs.getFileStatus(logPath); + localAppLogDirs.add(logPath); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue); + continue; + } catch (IOException fe) { + continue; + } + } + + if (localAppLogDirs.size() > 0) { + this.delService.delete(this.userUgi.getShortUserName(), null, + localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); } - this.delService.delete(this.userUgi.getShortUserName(), null, - localAppLogDirs); this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 1d6a9e168ce..77176b7e263 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -326,6 +327,15 @@ public class LogAggregationService extends AbstractService implements } this.dispatcher.getEventHandler().handle(eventResponse); } + + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access local fs"); + } + } + protected void initAppAggregator(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, @@ -344,7 +354,8 @@ public class LogAggregationService extends AbstractService implements new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId, userUgi, this.nodeId, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls, logAggregationContext, this.context); + appAcls, logAggregationContext, this.context, + getLocalFileContext(getConfig())); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 40173e1be21..0422ef9eea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -28,11 +30,14 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -96,6 +101,15 @@ public class NonAggregatingLogHandler extends AbstractService implements } super.serviceStop(); } + + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access local fs"); + } + } + @SuppressWarnings("unchecked") @Override @@ -160,21 +174,30 @@ public class NonAggregatingLogHandler extends AbstractService implements @Override @SuppressWarnings("unchecked") public void run() { - List rootLogDirs = - NonAggregatingLogHandler.this.dirsHandler.getLogDirs(); - Path[] localAppLogDirs = new Path[rootLogDirs.size()]; - int index = 0; - for (String rootLogDir : rootLogDirs) { - localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString()); - index++; + List localAppLogDirs = new ArrayList(); + FileContext lfs = getLocalFileContext(getConfig()); + for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { + Path logDir = new Path(rootLogDir, applicationId.toString()); + try { + lfs.getFileStatus(logDir); + localAppLogDirs.add(logDir); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Unsupported file system used for log dir " + logDir, ue); + continue; + } catch (IOException ie) { + continue; + } } + // Inform the application before the actual delete itself, so that links - // to logs will no longer be there on NM web-UI. + // to logs will no longer be there on NM web-UI. NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle( - new ApplicationEvent(this.applicationId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); - NonAggregatingLogHandler.this.delService.delete(user, null, - localAppLogDirs); + new ApplicationEvent(this.applicationId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + if (localAppLogDirs.size() > 0) { + NonAggregatingLogHandler.this.delService.delete(user, null, + (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index f19731f1369..e4353757969 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; public class TestDirectoryCollection { @@ -42,8 +44,13 @@ public class TestDirectoryCollection { TestDirectoryCollection.class.getName()).getAbsoluteFile(); private static final File testFile = new File(testDir, "testfile"); + private Configuration conf; + private FileContext localFs; + @Before - public void setup() throws IOException { + public void setupForTests() throws IOException { + conf = new Configuration(); + localFs = FileContext.getLocalFSFileContext(conf); testDir.mkdirs(); testFile.createNewFile(); } @@ -56,11 +63,12 @@ public class TestDirectoryCollection { @Test public void testConcurrentAccess() throws IOException { // Initialize DirectoryCollection with a file instead of a directory - Configuration conf = new Configuration(); + String[] dirs = {testFile.getPath()}; - DirectoryCollection dc = new DirectoryCollection(dirs, - conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, - YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); + DirectoryCollection dc = + new DirectoryCollection(dirs, conf.getFloat( + YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); // Create an iterator before checkDirs is called to reliable test case List list = dc.getGoodDirs(); @@ -78,9 +86,8 @@ public class TestDirectoryCollection { @Test public void testCreateDirectories() throws IOException { - Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); - FileContext localFs = FileContext.getLocalFSFileContext(conf); String dirA = new File(testDir, "dirA").getPath(); String dirB = new File(dirA, "dirB").getPath(); @@ -92,9 +99,10 @@ public class TestDirectoryCollection { localFs.setPermission(pathC, permDirC); String[] dirs = { dirA, dirB, dirC }; - DirectoryCollection dc = new DirectoryCollection(dirs, - conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, - YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); + DirectoryCollection dc = + new DirectoryCollection(dirs, conf.getFloat( + YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); FsPermission defaultPerm = FsPermission.getDefault() .applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm); @@ -120,25 +128,29 @@ public class TestDirectoryCollection { dc.checkDirs(); Assert.assertEquals(0, dc.getGoodDirs().size()); Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(1, dc.getFullDirs().size()); dc = new DirectoryCollection(dirs, 100.0F); dc.checkDirs(); Assert.assertEquals(1, dc.getGoodDirs().size()); Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024)); dc.checkDirs(); Assert.assertEquals(0, dc.getGoodDirs().size()); Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(1, dc.getFullDirs().size()); dc = new DirectoryCollection(dirs, 100.0F, 0); dc.checkDirs(); Assert.assertEquals(1, dc.getGoodDirs().size()); Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); } @Test - public void testDiskLimitsCutoffSetters() { + public void testDiskLimitsCutoffSetters() throws IOException { String[] dirs = { "dir" }; DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100); @@ -162,6 +174,47 @@ public class TestDirectoryCollection { Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff()); } + @Test + public void testFailedDisksBecomingGoodAgain() throws Exception { + + String dirA = new File(testDir, "dirA").getPath(); + String[] dirs = { dirA }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(1, dc.getFullDirs().size()); + + dc.setDiskUtilizationPercentageCutoff(100.0F); + dc.checkDirs(); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); + + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + + String dirB = new File(testDir, "dirB").getPath(); + Path pathB = new Path(dirB); + FsPermission permDirB = new FsPermission((short) 0400); + + localFs.mkdir(pathB, null, true); + localFs.setPermission(pathB, permDirB); + + String[] dirs2 = { dirB }; + + dc = new DirectoryCollection(dirs2, 100.0F); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); + permDirB = new FsPermission((short) 0700); + localFs.setPermission(pathB, permDirB); + dc.checkDirs(); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); + } + @Test public void testConstructors() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java index 057ea917b15..e22b7f9f3d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java @@ -21,8 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.File; import java.io.IOException; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -57,10 +62,11 @@ public class TestLocalDirsHandlerService { LocalDirsHandlerService dirSvc = new LocalDirsHandlerService(); dirSvc.init(conf); Assert.assertEquals(1, dirSvc.getLocalDirs().size()); + dirSvc.close(); } @Test - public void testValidPathsDirHandlerService() { + public void testValidPathsDirHandlerService() throws Exception { Configuration conf = new YarnConfiguration(); String localDir1 = new File("file:///" + testDir, "localDir1").getPath(); String localDir2 = new File("hdfs:///" + testDir, "localDir2").getPath(); @@ -76,5 +82,40 @@ public class TestLocalDirsHandlerService { Assert.assertEquals("Service should not be inited", STATE.STOPPED, dirSvc.getServiceState()); + dirSvc.close(); + } + + @Test + public void testGetFullDirs() throws Exception { + Configuration conf = new YarnConfiguration(); + + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext localFs = FileContext.getLocalFSFileContext(conf); + + String localDir1 = new File(testDir, "localDir1").getPath(); + String localDir2 = new File(testDir, "localDir2").getPath(); + String logDir1 = new File(testDir, "logDir1").getPath(); + String logDir2 = new File(testDir, "logDir2").getPath(); + Path localDir1Path = new Path(localDir1); + Path logDir1Path = new Path(logDir1); + FsPermission dirPermissions = new FsPermission((short) 0410); + localFs.mkdir(localDir1Path, dirPermissions, true); + localFs.mkdir(logDir1Path, dirPermissions, true); + + conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1 + "," + logDir2); + conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + 0.0f); + LocalDirsHandlerService dirSvc = new LocalDirsHandlerService(); + dirSvc.init(conf); + Assert.assertEquals(0, dirSvc.getLocalDirs().size()); + Assert.assertEquals(0, dirSvc.getLogDirs().size()); + Assert.assertEquals(1, dirSvc.getDiskFullLocalDirs().size()); + Assert.assertEquals(1, dirSvc.getDiskFullLogDirs().size()); + FileUtils.deleteDirectory(new File(localDir1)); + FileUtils.deleteDirectory(new File(localDir2)); + FileUtils.deleteDirectory(new File(logDir1)); + FileUtils.deleteDirectory(new File(logDir1)); + dirSvc.close(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java index 6a28605a5ab..35421968cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java @@ -196,7 +196,7 @@ public class TestNodeHealthService { healthStatus.getHealthReport().equals( NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG + NodeHealthCheckerService.SEPARATOR - + nodeHealthChecker.getDiskHandler().getDisksHealthReport())); + + nodeHealthChecker.getDiskHandler().getDisksHealthReport(false))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index fa5a4fcf5ed..d569fa797c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -42,6 +42,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; @@ -68,13 +69,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.security.AccessControlException; import org.junit.Assert; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; @@ -167,15 +171,15 @@ public class TestResourceLocalizationService { conf = new Configuration(); spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); lfs = FileContext.getFileContext(spylfs, conf); - doNothing().when(spylfs).mkdir( - isA(Path.class), isA(FsPermission.class), anyBoolean()); + String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString(); conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); } @After - public void cleanup() { + public void cleanup() throws IOException { conf = null; + FileUtils.deleteDirectory(new File(basedir.toString())); } @Test @@ -752,6 +756,39 @@ public class TestResourceLocalizationService { ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPermission = + ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + final Path userDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.USERCACHE); + final Path fileDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.FILECACHE); + final Path sysDir = + new Path(sDirs[0].substring("file:".length()), + ResourceLocalizationService.NM_PRIVATE_DIR); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", new Path(sDirs[0])); + final FileStatus nmFs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + nmPermission, "", "", sysDir); + + doAnswer(new Answer() { + @Override + public FileStatus answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + if (args.length > 0) { + if (args[0].equals(userDir) || args[0].equals(fileDir)) { + return fs; + } + } + return nmFs; + } + }).when(spylfs).getFileStatus(isA(Path.class)); + try { spyService.init(conf); spyService.start(); @@ -1775,5 +1812,274 @@ public class TestResourceLocalizationService { return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(), new Text("kind" + id), new Text("service" + id)); } + + /* + * Test to ensure ResourceLocalizationService can handle local dirs going bad. + * Test first sets up all the components required, then sends events to fetch + * a private, app and public resource. It then sends events to clean up the + * container and the app and ensures the right delete calls were made. + */ + @Test + @SuppressWarnings("unchecked") + // mocked generics + public void testFailedDirsResourceRelease() throws Exception { + // setup components + File f = new File(basedir.toString()); + String[] sDirs = new String[4]; + List localDirs = new ArrayList(sDirs.length); + for (int i = 0; i < 4; ++i) { + sDirs[i] = f.getAbsolutePath() + i; + localDirs.add(new Path(sDirs[i])); + } + List containerLocalDirs = new ArrayList(localDirs.size()); + List appLocalDirs = new ArrayList(localDirs.size()); + List nmLocalContainerDirs = new ArrayList(localDirs.size()); + List nmLocalAppDirs = new ArrayList(localDirs.size()); + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500); + + LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + // Ignore actual localization + EventHandler localizerBus = mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + LocalDirsHandlerService mockDirsHandler = + mock(LocalDirsHandlerService.class); + doReturn(new ArrayList(Arrays.asList(sDirs))).when( + mockDirsHandler).getLocalDirsForCleanup(); + + DeletionService delService = mock(DeletionService.class); + + // setup mocks + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + mockDirsHandler, new NMNullStateStoreService()); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( + isA(Configuration.class)); + doReturn(lfs).when(spyService) + .getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPermission = + ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", localDirs.get(0)); + final FileStatus nmFs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + nmPermission, "", "", localDirs.get(0)); + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + when(app.toString()).thenReturn(ConverterUtils.toString(appId)); + + // init container. + final Container c = getMockContainer(appId, 42, user); + + // setup local app dirs + List tmpDirs = mockDirsHandler.getLocalDirs(); + for (int i = 0; i < tmpDirs.size(); ++i) { + Path usersdir = new Path(tmpDirs.get(i), ContainerLocalizer.USERCACHE); + Path userdir = new Path(usersdir, user); + Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); + Path appDir = new Path(allAppsdir, ConverterUtils.toString(appId)); + Path containerDir = + new Path(appDir, ConverterUtils.toString(c.getContainerId())); + containerLocalDirs.add(containerDir); + appLocalDirs.add(appDir); + + Path sysDir = + new Path(tmpDirs.get(i), ResourceLocalizationService.NM_PRIVATE_DIR); + Path appSysDir = new Path(sysDir, ConverterUtils.toString(appId)); + Path containerSysDir = + new Path(appSysDir, ConverterUtils.toString(c.getContainerId())); + + nmLocalContainerDirs.add(containerSysDir); + nmLocalAppDirs.add(appSysDir); + } + + try { + spyService.init(conf); + spyService.start(); + + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // Get a handle on the trackers after they're setup with + // INIT_APP_RESOURCES + LocalResourcesTracker appTracker = + spyService.getLocalResourcesTracker( + LocalResourceVisibility.APPLICATION, user, appId); + LocalResourcesTracker privTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, + user, appId); + LocalResourcesTracker pubTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + user, appId); + + // init resources + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + + // Send localization requests, one for each type of resource + final LocalResource privResource = getPrivateMockedResource(r); + final LocalResourceRequest privReq = + new LocalResourceRequest(privResource); + + final LocalResource appResource = getAppMockedResource(r); + final LocalResourceRequest appReq = new LocalResourceRequest(appResource); + + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + + Map> req = + new HashMap>(); + req.put(LocalResourceVisibility.PRIVATE, + Collections.singletonList(privReq)); + req.put(LocalResourceVisibility.APPLICATION, + Collections.singletonList(appReq)); + req + .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq)); + + Map> req2 = + new HashMap>(); + req2.put(LocalResourceVisibility.PRIVATE, + Collections.singletonList(privReq)); + + // Send Request event + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + spyService.handle(new ContainerLocalizationRequestEvent(c, req2)); + dispatcher.await(); + + int privRsrcCount = 0; + for (LocalizedResource lr : privTracker) { + privRsrcCount++; + Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount()); + Assert.assertEquals(privReq, lr.getRequest()); + } + Assert.assertEquals(1, privRsrcCount); + + int appRsrcCount = 0; + for (LocalizedResource lr : appTracker) { + appRsrcCount++; + Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount()); + Assert.assertEquals(appReq, lr.getRequest()); + } + Assert.assertEquals(1, appRsrcCount); + + int pubRsrcCount = 0; + for (LocalizedResource lr : pubTracker) { + pubRsrcCount++; + Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount()); + Assert.assertEquals(pubReq, lr.getRequest()); + } + Assert.assertEquals(1, pubRsrcCount); + + // setup mocks for test, a set of dirs with IOExceptions and let the rest + // go through + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 2) { + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(containerLocalDirs.get(i))); + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(nmLocalContainerDirs.get(i))); + } else { + doReturn(fs).when(spylfs) + .getFileStatus(eq(containerLocalDirs.get(i))); + doReturn(nmFs).when(spylfs).getFileStatus( + eq(nmLocalContainerDirs.get(i))); + } + } + + // Send Cleanup Event + spyService.handle(new ContainerLocalizationCleanupEvent(c, req)); + verify(mockLocallilzerTracker).cleanupPrivLocalizers( + "container_314159265358979_0003_01_000042"); + + // match cleanup events with the mocks we setup earlier + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 2) { + try { + verify(delService).delete(user, containerLocalDirs.get(i)); + verify(delService).delete(null, nmLocalContainerDirs.get(i)); + Assert.fail("deletion attempts for invalid dirs"); + } catch (Throwable e) { + continue; + } + } else { + verify(delService).delete(user, containerLocalDirs.get(i)); + verify(delService).delete(null, nmLocalContainerDirs.get(i)); + } + } + + ArgumentMatcher matchesAppDestroy = + new ArgumentMatcher() { + @Override + public boolean matches(Object o) { + ApplicationEvent evt = (ApplicationEvent) o; + return (evt.getType() == ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP) + && appId == evt.getApplicationID(); + } + }; + + dispatcher.await(); + + // setup mocks again, this time throw UnsupportedFileSystemException and + // IOExceptions + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 3) { + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(appLocalDirs.get(i))); + Mockito.doThrow(new UnsupportedFileSystemException("test")) + .when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i))); + } else { + doReturn(fs).when(spylfs).getFileStatus(eq(appLocalDirs.get(i))); + doReturn(nmFs).when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i))); + } + } + LocalizationEvent destroyApp = + new ApplicationLocalizationEvent( + LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app); + spyService.handle(destroyApp); + verify(applicationBus).handle(argThat(matchesAppDestroy)); + + // verify we got the right delete calls + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 3) { + try { + verify(delService).delete(user, containerLocalDirs.get(i)); + verify(delService).delete(null, nmLocalContainerDirs.get(i)); + Assert.fail("deletion attempts for invalid dirs"); + } catch (Throwable e) { + continue; + } + } else { + verify(delService).delete(user, appLocalDirs.get(i)); + verify(delService).delete(null, nmLocalAppDirs.get(i)); + } + } + + } finally { + dispatcher.stop(); + delService.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index ab86a18df5c..8a5441a615a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -58,6 +59,8 @@ import org.junit.Assert; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -105,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; @@ -136,12 +140,19 @@ public class TestLogAggregationService extends BaseContainerManagerTest { super(); this.remoteRootLogDir.mkdir(); } + + DrainDispatcher dispatcher; + EventHandler appEventHandler; @Override + @SuppressWarnings("unchecked") public void setup() throws IOException { super.setup(); NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555); ((NMContext)context).setNodeId(nodeId); + dispatcher = createDispatcher(); + appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); } @Override @@ -149,10 +160,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { super.tearDown(); createContainerExecutor().deleteAsUser(user, new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {}); + dispatcher.await(); + dispatcher.stop(); + dispatcher.close(); } @Test - @SuppressWarnings("unchecked") public void testLocalFileDeletionAfterUpload() throws Exception { this.delSrvc = new DeletionService(createContainerExecutor()); delSrvc = spy(delSrvc); @@ -161,10 +174,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); @@ -236,16 +245,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test - @SuppressWarnings("unchecked") public void testNoContainerOnNode() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler); @@ -285,6 +289,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { }; checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID"); dispatcher.stop(); + logAggregationService.close(); } @Test @@ -294,6 +299,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); + String[] fileNames = new String[] { "stdout", "stderr", "syslog" }; DrainDispatcher dispatcher = createDispatcher(); EventHandler appEventHandler = mock(EventHandler.class); @@ -432,17 +438,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test - @SuppressWarnings("unchecked") public void testVerifyAndCreateRemoteDirsFailure() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); @@ -456,8 +457,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.start(); // Now try to start an application - ApplicationId appId = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, @@ -475,8 +477,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest { Mockito.reset(logAggregationService); // Now try to start another one - ApplicationId appId2 = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId2 = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); File appLogDir = new File(localLogDir, ConverterUtils.toString(appId2)); appLogDir.mkdir(); @@ -578,6 +581,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class)); + aggSvc.stop(); + aggSvc.close(); } @Test @@ -588,19 +593,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest { localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - - DrainDispatcher dispatcher = createDispatcher(); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - + LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); logAggregationService.init(this.conf); logAggregationService.start(); - ApplicationId appId = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); doThrow(new YarnRuntimeException("KABOOM!")) .when(logAggregationService).initAppAggregator( eq(appId), eq(user), any(Credentials.class), @@ -634,26 +636,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test - @SuppressWarnings("unchecked") public void testLogAggregationCreateDirsFailsWithoutKillingNM() throws Exception { this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - - DrainDispatcher dispatcher = createDispatcher(); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - + LogAggregationService logAggregationService = spy( new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler)); logAggregationService.init(this.conf); logAggregationService.start(); - ApplicationId appId = BuilderUtils.newApplicationId( - System.currentTimeMillis(), (int)Math.random()); + ApplicationId appId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), + (int) (Math.random() * 1000)); Exception e = new RuntimeException("KABOOM!"); doThrow(e) .when(logAggregationService).createAppDir(any(String.class), @@ -905,18 +903,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } @Test(timeout=20000) - @SuppressWarnings("unchecked") public void testStopAfterError() throws Exception { DeletionService delSrvc = mock(DeletionService.class); // get the AppLogAggregationImpl thread to crash LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException()); - - DrainDispatcher dispatcher = createDispatcher(); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - + LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, delSrvc, mockedDirSvc); @@ -930,20 +923,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest { logAggregationService.stop(); assertEquals(0, logAggregationService.getNumAggregators()); + logAggregationService.close(); } @Test - @SuppressWarnings("unchecked") public void testLogAggregatorCleanup() throws Exception { DeletionService delSrvc = mock(DeletionService.class); // get the AppLogAggregationImpl thread to crash LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class); - DrainDispatcher dispatcher = createDispatcher(); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - LogAggregationService logAggregationService = new LogAggregationService(dispatcher, this.context, delSrvc, mockedDirSvc); @@ -964,6 +953,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest { } Assert.assertEquals("Log aggregator failed to cleanup!", 0, logAggregationService.getNumAggregators()); + logAggregationService.stop(); + logAggregationService.close(); } @SuppressWarnings("unchecked") @@ -1039,6 +1030,72 @@ public class TestLogAggregationService extends BaseContainerManagerTest { return sb.toString(); } + /* + * Test to make sure we handle cases where the directories we get back from + * the LocalDirsHandler may have issues including the log dir not being + * present as well as other issues. The test uses helper functions from + * TestNonAggregatingLogHandler. + */ + @Test + public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception { + + // setup conf and services + DeletionService mockDelService = mock(DeletionService.class); + File[] localLogDirs = + TestNonAggregatingLogHandler.getLocalLogDirFiles(this.getClass() + .getName(), 7); + final List localLogDirPaths = + new ArrayList(localLogDirs.length); + for (int i = 0; i < localLogDirs.length; i++) { + localLogDirPaths.add(localLogDirs[i].getAbsolutePath()); + } + + String localLogDirsString = StringUtils.join(localLogDirPaths, ","); + + this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + this.remoteRootLogDir.getAbsolutePath()); + this.conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500); + + ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(application1, 1); + + this.dirsHandler = new LocalDirsHandlerService(); + LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class); + + LogAggregationService logAggregationService = + spy(new LogAggregationService(dispatcher, this.context, mockDelService, + mockDirsHandler)); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + FileContext lfs = FileContext.getFileContext(spylfs, conf); + doReturn(lfs).when(logAggregationService).getLocalFileContext( + isA(Configuration.class)); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + TestNonAggregatingLogHandler.runMockedFailedDirs(logAggregationService, + application1, user, mockDelService, mockDirsHandler, conf, spylfs, lfs, + localLogDirs); + + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); + verify(logAggregationService).closeFileSystems( + any(UserGroupInformation.class)); + + ApplicationEvent expectedEvents[] = + new ApplicationEvent[] { + new ApplicationEvent(appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), + new ApplicationEvent(appAttemptId.getApplicationId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) }; + + checkEvents(appEventHandler, expectedEvents, true, "getType", + "getApplicationID"); + } + @Test (timeout = 50000) @SuppressWarnings("unchecked") public void testLogAggregationServiceWithPatterns() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index 300ca286bab..d0f647235db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -19,15 +19,36 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.NotSerializableException; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -45,25 +66,52 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import org.mockito.exceptions.verification.WantedButNotInvoked; +import org.mockito.internal.matchers.VarargMatcher; public class TestNonAggregatingLogHandler { + + DeletionService mockDelService; + Configuration conf; + DrainDispatcher dispatcher; + EventHandler appEventHandler; + String user = "testuser"; + ApplicationId appId; + ApplicationAttemptId appAttemptId; + ContainerId container11; + LocalDirsHandlerService dirsHandler; + + @Before + @SuppressWarnings("unchecked") + public void setup() { + mockDelService = mock(DeletionService.class); + conf = new YarnConfiguration(); + dispatcher = createDispatcher(conf); + appEventHandler = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, appEventHandler); + appId = BuilderUtils.newApplicationId(1234, 1); + appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); + container11 = BuilderUtils.newContainerId(appAttemptId, 1); + dirsHandler = new LocalDirsHandlerService(); + } + + @After + public void tearDown() throws IOException { + dirsHandler.stop(); + dirsHandler.close(); + dispatcher.await(); + dispatcher.stop(); + dispatcher.close(); + } @Test - @SuppressWarnings("unchecked") - public void testLogDeletion() { - DeletionService delService = mock(DeletionService.class); - Configuration conf = new YarnConfiguration(); - String user = "testuser"; - - File[] localLogDirs = new File[2]; - localLogDirs[0] = - new File("target", this.getClass().getName() + "-localLogDir0") - .getAbsoluteFile(); - localLogDirs[1] = - new File("target", this.getClass().getName() + "-localLogDir1") - .getAbsoluteFile(); + public void testLogDeletion() throws IOException { + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2); String localLogDirsString = localLogDirs[0].getAbsolutePath() + "," + localLogDirs[1].getAbsolutePath(); @@ -72,72 +120,50 @@ public class TestNonAggregatingLogHandler { conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); - DrainDispatcher dispatcher = createDispatcher(conf); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - - LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); - ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); - ApplicationAttemptId appAttemptId1 = - BuilderUtils.newApplicationAttemptId(appId1, 1); - ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); + NonAggregatingLogHandler rawLogHandler = + new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler); + NonAggregatingLogHandler logHandler = spy(rawLogHandler); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + FileContext lfs = FileContext.getFileContext(spylfs, conf); + doReturn(lfs).when(logHandler) + .getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", + new Path(localLogDirs[0].getAbsolutePath())); + doReturn(fs).when(spylfs).getFileStatus(isA(Path.class)); - NonAggregatingLogHandler logHandler = - new NonAggregatingLogHandler(dispatcher, delService, dirsHandler); logHandler.init(conf); logHandler.start(); - logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); - logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); Path[] localAppLogDirs = new Path[2]; localAppLogDirs[0] = - new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[0].getAbsolutePath(), appId.toString()); localAppLogDirs[1] = - new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[1].getAbsolutePath(), appId.toString()); - // 5 seconds for the delete which is a separate thread. - long verifyStartTime = System.currentTimeMillis(); - WantedButNotInvoked notInvokedException = null; - boolean matched = false; - while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) { - try { - verify(delService).delete(eq(user), (Path) eq(null), - eq(localAppLogDirs[0]), eq(localAppLogDirs[1])); - matched = true; - } catch (WantedButNotInvoked e) { - notInvokedException = e; - try { - Thread.sleep(50l); - } catch (InterruptedException i) { - } - } - } - if (!matched) { - throw notInvokedException; + testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs); + logHandler.close(); + for (int i = 0; i < localLogDirs.length; i++) { + FileUtils.deleteDirectory(localLogDirs[i]); } } @Test - @SuppressWarnings("unchecked") - public void testDelayedDelete() { - DeletionService delService = mock(DeletionService.class); - Configuration conf = new YarnConfiguration(); - String user = "testuser"; - - File[] localLogDirs = new File[2]; - localLogDirs[0] = - new File("target", this.getClass().getName() + "-localLogDir0") - .getAbsoluteFile(); - localLogDirs[1] = - new File("target", this.getClass().getName() + "-localLogDir1") - .getAbsoluteFile(); + public void testDelayedDelete() throws IOException { + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2); String localLogDirsString = localLogDirs[0].getAbsolutePath() + "," + localLogDirs[1].getAbsolutePath(); @@ -148,42 +174,36 @@ public class TestNonAggregatingLogHandler { conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS); - DrainDispatcher dispatcher = createDispatcher(conf); - EventHandler appEventHandler = mock(EventHandler.class); - dispatcher.register(ApplicationEventType.class, appEventHandler); - - LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); - ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1); - ApplicationAttemptId appAttemptId1 = - BuilderUtils.newApplicationAttemptId(appId1, 1); - ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1); - NonAggregatingLogHandler logHandler = - new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService, + new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService, dirsHandler); logHandler.init(conf); logHandler.start(); - logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null, + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); - logHandler.handle(new LogHandlerAppFinishedEvent(appId1)); + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); Path[] localAppLogDirs = new Path[2]; localAppLogDirs[0] = - new Path(localLogDirs[0].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[0].getAbsolutePath(), appId.toString()); localAppLogDirs[1] = - new Path(localLogDirs[1].getAbsolutePath(), appId1.toString()); + new Path(localLogDirs[1].getAbsolutePath(), appId.toString()); ScheduledThreadPoolExecutor mockSched = ((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched; verify(mockSched).schedule(any(Runnable.class), eq(10800l), eq(TimeUnit.SECONDS)); + logHandler.close(); + for (int i = 0; i < localLogDirs.length; i++) { + FileUtils.deleteDirectory(localLogDirs[i]); + } } @Test @@ -202,25 +222,25 @@ public class TestNonAggregatingLogHandler { verify(logHandler.mockSched) .awaitTermination(eq(10l), eq(TimeUnit.SECONDS)); verify(logHandler.mockSched).shutdownNow(); + logHandler.close(); + aggregatingLogHandler.close(); } @Test - public void testHandlingApplicationFinishedEvent() { - Configuration conf = new Configuration(); - LocalDirsHandlerService dirsService = new LocalDirsHandlerService(); + public void testHandlingApplicationFinishedEvent() throws IOException { DeletionService delService = new DeletionService(null); NonAggregatingLogHandler aggregatingLogHandler = new NonAggregatingLogHandler(new InlineDispatcher(), delService, - dirsService); + dirsHandler); - dirsService.init(conf); - dirsService.start(); + dirsHandler.init(conf); + dirsHandler.start(); delService.init(conf); delService.start(); aggregatingLogHandler.init(conf); aggregatingLogHandler.start(); - ApplicationId appId = BuilderUtils.newApplicationId(1234, 1); + // It should NOT throw RejectedExecutionException aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId)); aggregatingLogHandler.stop(); @@ -228,6 +248,7 @@ public class TestNonAggregatingLogHandler { // It should NOT throw RejectedExecutionException after stopping // handler service. aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId)); + aggregatingLogHandler.close(); } private class NonAggregatingLogHandlerWithMockExecutor extends @@ -255,4 +276,201 @@ public class TestNonAggregatingLogHandler { dispatcher.start(); return dispatcher; } + + /* + * Test to ensure that we handle the cleanup of directories that may not have + * the application log dirs we're trying to delete or may have other problems. + * Test creates 7 log dirs, and fails the directory check for 4 of them and + * then checks to ensure we tried to delete only the ones that passed the + * check. + */ + @Test + public void testFailedDirLogDeletion() throws Exception { + + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 7); + final List localLogDirPaths = + new ArrayList(localLogDirs.length); + for (int i = 0; i < localLogDirs.length; i++) { + localLogDirPaths.add(localLogDirs[i].getAbsolutePath()); + } + + String localLogDirsString = StringUtils.join(localLogDirPaths, ","); + + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l); + + LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class); + + NonAggregatingLogHandler rawLogHandler = + new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler); + NonAggregatingLogHandler logHandler = spy(rawLogHandler); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + FileContext lfs = FileContext.getFileContext(spylfs, conf); + doReturn(lfs).when(logHandler) + .getLocalFileContext(isA(Configuration.class)); + logHandler.init(conf); + logHandler.start(); + runMockedFailedDirs(logHandler, appId, user, mockDelService, + mockDirsHandler, conf, spylfs, lfs, localLogDirs); + logHandler.close(); + } + + /** + * Function to run a log handler with directories failing the getFileStatus + * call. The function accepts the log handler, setup the mocks to fail with + * specific exceptions and ensures the deletion service has the correct calls. + * + * @param logHandler the logHandler implementation to test + * + * @param appId the application id that we wish when sending events to the log + * handler + * + * @param user the user name to use + * + * @param mockDelService a mock of the DeletionService which we will verify + * the delete calls against + * + * @param dirsHandler a spy or mock on the LocalDirsHandler service used to + * when creating the logHandler. It needs to be a spy so that we can intercept + * the getAllLogDirs() call. + * + * @param conf the configuration used + * + * @param spylfs a spy on the AbstractFileSystem object used when creating lfs + * + * @param lfs the FileContext object to be used to mock the getFileStatus() + * calls + * + * @param localLogDirs list of the log dirs to run the test against, must have + * at least 7 entries + */ + public static void runMockedFailedDirs(LogHandler logHandler, + ApplicationId appId, String user, DeletionService mockDelService, + LocalDirsHandlerService dirsHandler, Configuration conf, + AbstractFileSystem spylfs, FileContext lfs, File[] localLogDirs) + throws Exception { + Map appAcls = new HashMap(); + if (localLogDirs.length < 7) { + throw new IllegalArgumentException( + "Argument localLogDirs must be at least of length 7"); + } + Path[] localAppLogDirPaths = new Path[localLogDirs.length]; + for (int i = 0; i < localAppLogDirPaths.length; i++) { + localAppLogDirPaths[i] = + new Path(localLogDirs[i].getAbsolutePath(), appId.toString()); + } + final List localLogDirPaths = + new ArrayList(localLogDirs.length); + for (int i = 0; i < localLogDirs.length; i++) { + localLogDirPaths.add(localLogDirs[i].getAbsolutePath()); + } + + // setup mocks + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", + new Path(localLogDirs[0].getAbsolutePath())); + doReturn(fs).when(spylfs).getFileStatus(isA(Path.class)); + doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup(); + + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls)); + + // test case where some dirs have the log dir to delete + // mock some dirs throwing various exceptions + // verify deletion happens only on the others + Mockito.doThrow(new FileNotFoundException()).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[0])); + doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[1])); + Mockito.doThrow(new AccessControlException()).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[2])); + doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[3])); + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[4])); + Mockito.doThrow(new UnsupportedFileSystemException("test")).when(spylfs) + .getFileStatus(eq(localAppLogDirPaths[5])); + doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[6])); + + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); + + testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirPaths[1], + localAppLogDirPaths[3], localAppLogDirPaths[6]); + + return; + } + + static class DeletePathsMatcher extends ArgumentMatcher implements + VarargMatcher { + + // to get rid of serialization warning + static final long serialVersionUID = 0; + + private transient Path[] matchPaths; + + DeletePathsMatcher(Path... matchPaths) { + this.matchPaths = matchPaths; + } + + @Override + public boolean matches(Object varargs) { + return new EqualsBuilder().append(matchPaths, varargs).isEquals(); + } + + // function to get rid of FindBugs warning + private void readObject(ObjectInputStream os) throws NotSerializableException { + throw new NotSerializableException(this.getClass().getName()); + } + } + + /** + * Function to verify that the DeletionService object received the right + * requests. + * + * @param delService the DeletionService mock which we verify against + * + * @param user the user name to use when verifying the deletion + * + * @param timeout amount in milliseconds to wait before we decide the calls + * didn't come through + * + * @param matchPaths the paths to match in the delete calls + * + * @throws WantedButNotInvoked if the calls could not be verified + */ + static void testDeletionServiceCall(DeletionService delService, String user, + long timeout, Path... matchPaths) { + + long verifyStartTime = System.currentTimeMillis(); + WantedButNotInvoked notInvokedException = null; + boolean matched = false; + while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) { + try { + verify(delService).delete(eq(user), (Path) eq(null), + Mockito.argThat(new DeletePathsMatcher(matchPaths))); + matched = true; + } catch (WantedButNotInvoked e) { + notInvokedException = e; + try { + Thread.sleep(50l); + } catch (InterruptedException i) { + } + } + } + if (!matched) { + throw notInvokedException; + } + return; + } + + public static File[] getLocalLogDirFiles(String name, int number) { + File[] dirs = new File[number]; + for (int i = 0; i < dirs.length; i++) { + dirs[i] = new File("target", name + "-localLogDir" + i).getAbsoluteFile(); + } + return dirs; + } }