YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev

(cherry picked from commit 6f2028bd15)
This commit is contained in:
Jason Lowe 2014-10-21 17:29:22 +00:00
parent fd7ba56f6a
commit 3820bf055e
15 changed files with 1444 additions and 292 deletions

View File

@ -349,6 +349,9 @@ Release 2.6.0 - UNRELEASED
YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. (Xuan
Gong via zjshen)
YARN-90. NodeManager should identify failed disks becoming good again
(Varun Vasudev via jlowe)
OPTIMIZATIONS
BUG FIXES

View File

@ -21,18 +21,23 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
* Manages a list of local storage directories.
@ -40,9 +45,38 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
class DirectoryCollection {
private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
public enum DiskErrorCause {
DISK_FULL, OTHER
}
static class DiskErrorInformation {
DiskErrorCause cause;
String message;
DiskErrorInformation(DiskErrorCause cause, String message) {
this.cause = cause;
this.message = message;
}
}
/**
* Returns a merged list which contains all the elements of l1 and l2
* @param l1 the first list to be included
* @param l2 the second list to be included
* @return a new list containing all the elements of the first and second list
*/
static List<String> concat(List<String> l1, List<String> l2) {
List<String> ret = new ArrayList<String>(l1.size() + l2.size());
ret.addAll(l1);
ret.addAll(l2);
return ret;
}
// Good local storage directories
private List<String> localDirs;
private List<String> failedDirs;
private List<String> errorDirs;
private List<String> fullDirs;
private int numFailures;
private float diskUtilizationPercentageCutoff;
@ -109,7 +143,9 @@ class DirectoryCollection {
float utilizationPercentageCutOff,
long utilizationSpaceCutOff) {
localDirs = new CopyOnWriteArrayList<String>(dirs);
failedDirs = new CopyOnWriteArrayList<String>();
errorDirs = new CopyOnWriteArrayList<String>();
fullDirs = new CopyOnWriteArrayList<String>();
diskUtilizationPercentageCutoff = utilizationPercentageCutOff;
diskUtilizationSpaceCutoff = utilizationSpaceCutOff;
diskUtilizationPercentageCutoff =
@ -131,7 +167,16 @@ class DirectoryCollection {
* @return the failed directories
*/
synchronized List<String> getFailedDirs() {
return Collections.unmodifiableList(failedDirs);
return Collections.unmodifiableList(
DirectoryCollection.concat(errorDirs, fullDirs));
}
/**
* @return the directories that have used all disk space
*/
synchronized List<String> getFullDirs() {
return fullDirs;
}
/**
@ -158,7 +203,7 @@ class DirectoryCollection {
LOG.warn("Unable to create directory " + dir + " error " +
e.getMessage() + ", removing from the list of valid directories.");
localDirs.remove(dir);
failedDirs.add(dir);
errorDirs.add(dir);
numFailures++;
failed = true;
}
@ -167,61 +212,147 @@ class DirectoryCollection {
}
/**
* Check the health of current set of local directories, updating the list
* of valid directories if necessary.
* @return <em>true</em> if there is a new disk-failure identified in
* this checking. <em>false</em> otherwise.
* Check the health of current set of local directories(good and failed),
* updating the list of valid directories if necessary.
*
* @return <em>true</em> if there is a new disk-failure identified in this
* checking or a failed directory passes the disk check <em>false</em>
* otherwise.
*/
synchronized boolean checkDirs() {
int oldNumFailures = numFailures;
HashSet<String> checkFailedDirs = new HashSet<String>();
for (final String dir : localDirs) {
boolean setChanged = false;
Set<String> preCheckGoodDirs = new HashSet<String>(localDirs);
Set<String> preCheckFullDirs = new HashSet<String>(fullDirs);
Set<String> preCheckOtherErrorDirs = new HashSet<String>(errorDirs);
List<String> failedDirs = DirectoryCollection.concat(errorDirs, fullDirs);
List<String> allLocalDirs =
DirectoryCollection.concat(localDirs, failedDirs);
Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs);
localDirs.clear();
errorDirs.clear();
fullDirs.clear();
for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck
.entrySet()) {
String dir = entry.getKey();
DiskErrorInformation errorInformation = entry.getValue();
switch (entry.getValue().cause) {
case DISK_FULL:
fullDirs.add(entry.getKey());
break;
case OTHER:
errorDirs.add(entry.getKey());
break;
}
if (preCheckGoodDirs.contains(dir)) {
LOG.warn("Directory " + dir + " error, " + errorInformation.message
+ ", removing from list of valid directories");
setChanged = true;
numFailures++;
}
}
for (String dir : allLocalDirs) {
if (!dirsFailedCheck.containsKey(dir)) {
localDirs.add(dir);
if (preCheckFullDirs.contains(dir)
|| preCheckOtherErrorDirs.contains(dir)) {
setChanged = true;
LOG.info("Directory " + dir
+ " passed disk check, adding to list of valid directories.");
}
}
}
Set<String> postCheckFullDirs = new HashSet<String>(fullDirs);
Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs);
for (String dir : preCheckFullDirs) {
if (postCheckOtherDirs.contains(dir)) {
LOG.warn("Directory " + dir + " error "
+ dirsFailedCheck.get(dir).message);
}
}
for (String dir : preCheckOtherErrorDirs) {
if (postCheckFullDirs.contains(dir)) {
LOG.warn("Directory " + dir + " error "
+ dirsFailedCheck.get(dir).message);
}
}
return setChanged;
}
Map<String, DiskErrorInformation> testDirs(List<String> dirs) {
HashMap<String, DiskErrorInformation> ret =
new HashMap<String, DiskErrorInformation>();
for (final String dir : dirs) {
String msg;
try {
File testDir = new File(dir);
DiskChecker.checkDir(testDir);
if (isDiskUsageUnderPercentageLimit(testDir)) {
LOG.warn("Directory " + dir
+ " error, used space above threshold of "
if (isDiskUsageOverPercentageLimit(testDir)) {
msg =
"used space above threshold of "
+ diskUtilizationPercentageCutoff
+ "%, removing from the list of valid directories.");
checkFailedDirs.add(dir);
} else if (isDiskFreeSpaceWithinLimit(testDir)) {
LOG.warn("Directory " + dir + " error, free space below limit of "
+ diskUtilizationSpaceCutoff
+ "MB, removing from the list of valid directories.");
checkFailedDirs.add(dir);
}
} catch (DiskErrorException de) {
LOG.warn("Directory " + dir + " error " + de.getMessage()
+ ", removing from the list of valid directories.");
checkFailedDirs.add(dir);
}
}
for (String dir : checkFailedDirs) {
localDirs.remove(dir);
failedDirs.add(dir);
numFailures++;
}
return numFailures > oldNumFailures;
+ "%";
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
continue;
} else if (isDiskFreeSpaceUnderLimit(testDir)) {
msg =
"free space below limit of " + diskUtilizationSpaceCutoff
+ "MB";
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
continue;
}
private boolean isDiskUsageUnderPercentageLimit(File dir) {
// create a random dir to make sure fs isn't in read-only mode
verifyDirUsingMkdir(testDir);
} catch (IOException ie) {
ret.put(dir,
new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage()));
}
}
return ret;
}
/**
* Function to test whether a dir is working correctly by actually creating a
* random directory.
*
* @param dir
* the dir to test
*/
private void verifyDirUsingMkdir(File dir) throws IOException {
String randomDirName = RandomStringUtils.randomAlphanumeric(5);
File target = new File(dir, randomDirName);
int i = 0;
while (target.exists()) {
randomDirName = RandomStringUtils.randomAlphanumeric(5) + i;
target = new File(dir, randomDirName);
i++;
}
try {
DiskChecker.checkDir(target);
} finally {
FileUtils.deleteQuietly(target);
}
}
private boolean isDiskUsageOverPercentageLimit(File dir) {
float freePercentage =
100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
float usedPercentage = 100.0F - freePercentage;
if (usedPercentage > diskUtilizationPercentageCutoff
|| usedPercentage >= 100.0F) {
return true;
}
return false;
return (usedPercentage > diskUtilizationPercentageCutoff
|| usedPercentage >= 100.0F);
}
private boolean isDiskFreeSpaceWithinLimit(File dir) {
private boolean isDiskFreeSpaceUnderLimit(File dir) {
long freeSpace = dir.getUsableSpace() / (1024 * 1024);
if (freeSpace < this.diskUtilizationSpaceCutoff) {
return true;
}
return false;
return freeSpace < this.diskUtilizationSpaceCutoff;
}
private void createDir(FileContext localFs, Path dir, FsPermission perm)

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@ -150,7 +152,7 @@ public class LocalDirsHandlerService extends AbstractService {
boolean createSucceeded = localDirs.createNonExistentDirs(localFs, perm);
createSucceeded &= logDirs.createNonExistentDirs(localFs, perm);
if (!createSucceeded) {
updateDirsAfterFailure();
updateDirsAfterTest();
}
// Check the disk health immediately to weed out bad directories
@ -197,9 +199,52 @@ public class LocalDirsHandlerService extends AbstractService {
}
/**
* @return the local directories which have no disk space
*/
public List<String> getDiskFullLocalDirs() {
return localDirs.getFullDirs();
}
/**
* @return the log directories that have no disk space
*/
public List<String> getDiskFullLogDirs() {
return logDirs.getFullDirs();
}
/**
* Function to get the local dirs which should be considered when cleaning up
* resources. Contains the good local dirs and the local dirs that have reached
* the disk space limit
*
* @return the local dirs which should be considered for cleaning up
*/
public List<String> getLocalDirsForCleanup() {
return DirectoryCollection.concat(localDirs.getGoodDirs(),
localDirs.getFullDirs());
}
/**
* Function to get the log dirs which should be considered when cleaning up
* resources. Contains the good log dirs and the log dirs that have reached
* the disk space limit
*
* @return the log dirs which should be considered for cleaning up
*/
public List<String> getLogDirsForCleanup() {
return DirectoryCollection.concat(logDirs.getGoodDirs(),
logDirs.getFullDirs());
}
/**
* Function to generate a report on the state of the disks.
*
* @param listGoodDirs
* flag to determine whether the report should report the state of
* good dirs or failed dirs
* @return the health report of nm-local-dirs and nm-log-dirs
*/
public String getDisksHealthReport() {
public String getDisksHealthReport(boolean listGoodDirs) {
if (!isDiskHealthCheckerEnabled) {
return "";
}
@ -207,20 +252,31 @@ public class LocalDirsHandlerService extends AbstractService {
StringBuilder report = new StringBuilder();
List<String> failedLocalDirsList = localDirs.getFailedDirs();
List<String> failedLogDirsList = logDirs.getFailedDirs();
int numLocalDirs = localDirs.getGoodDirs().size()
+ failedLocalDirsList.size();
int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size();
List<String> goodLocalDirsList = localDirs.getGoodDirs();
List<String> goodLogDirsList = logDirs.getGoodDirs();
int numLocalDirs = goodLocalDirsList.size() + failedLocalDirsList.size();
int numLogDirs = goodLogDirsList.size() + failedLogDirsList.size();
if (!listGoodDirs) {
if (!failedLocalDirsList.isEmpty()) {
report.append(failedLocalDirsList.size() + "/" + numLocalDirs
+ " local-dirs turned bad: "
+ StringUtils.join(",", failedLocalDirsList) + ";");
+ " local-dirs are bad: "
+ StringUtils.join(",", failedLocalDirsList) + "; ");
}
if (!failedLogDirsList.isEmpty()) {
report.append(failedLogDirsList.size() + "/" + numLogDirs
+ " log-dirs turned bad: "
+ StringUtils.join(",", failedLogDirsList));
+ " log-dirs are bad: " + StringUtils.join(",", failedLogDirsList));
}
} else {
report.append(goodLocalDirsList.size() + "/" + numLocalDirs
+ " local-dirs are good: " + StringUtils.join(",", goodLocalDirsList)
+ "; ");
report.append(goodLogDirsList.size() + "/" + numLogDirs
+ " log-dirs are good: " + StringUtils.join(",", goodLogDirsList));
}
return report.toString();
}
/**
@ -262,8 +318,8 @@ public class LocalDirsHandlerService extends AbstractService {
* Set good local dirs and good log dirs in the configuration so that the
* LocalDirAllocator objects will use this updated configuration only.
*/
private void updateDirsAfterFailure() {
LOG.info("Disk(s) failed. " + getDisksHealthReport());
private void updateDirsAfterTest() {
Configuration conf = getConfig();
List<String> localDirs = getLocalDirs();
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
@ -273,25 +329,93 @@ public class LocalDirsHandlerService extends AbstractService {
logDirs.toArray(new String[logDirs.size()]));
if (!areDisksHealthy()) {
// Just log.
LOG.error("Most of the disks failed. " + getDisksHealthReport());
LOG.error("Most of the disks failed. " + getDisksHealthReport(false));
}
}
private void logDiskStatus(boolean newDiskFailure, boolean diskTurnedGood) {
if (newDiskFailure) {
String report = getDisksHealthReport(false);
LOG.info("Disk(s) failed: " + report);
}
if (diskTurnedGood) {
String report = getDisksHealthReport(true);
LOG.info("Disk(s) turned good: " + report);
}
}
private void checkDirs() {
boolean newFailure = false;
boolean disksStatusChange = false;
Set<String> failedLocalDirsPreCheck =
new HashSet<String>(localDirs.getFailedDirs());
Set<String> failedLogDirsPreCheck =
new HashSet<String>(logDirs.getFailedDirs());
if (localDirs.checkDirs()) {
newFailure = true;
disksStatusChange = true;
}
if (logDirs.checkDirs()) {
newFailure = true;
disksStatusChange = true;
}
if (newFailure) {
updateDirsAfterFailure();
Set<String> failedLocalDirsPostCheck =
new HashSet<String>(localDirs.getFailedDirs());
Set<String> failedLogDirsPostCheck =
new HashSet<String>(logDirs.getFailedDirs());
boolean disksFailed = false;
boolean disksTurnedGood = false;
disksFailed =
disksTurnedBad(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
disksTurnedGood =
disksTurnedGood(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
// skip check if we have new failed or good local dirs since we're going to
// log anyway
if (!disksFailed) {
disksFailed =
disksTurnedBad(failedLogDirsPreCheck, failedLogDirsPostCheck);
}
if (!disksTurnedGood) {
disksTurnedGood =
disksTurnedGood(failedLogDirsPreCheck, failedLogDirsPostCheck);
}
logDiskStatus(disksFailed, disksTurnedGood);
if (disksStatusChange) {
updateDirsAfterTest();
}
lastDisksCheckTime = System.currentTimeMillis();
}
private boolean disksTurnedBad(Set<String> preCheckFailedDirs,
Set<String> postCheckDirs) {
boolean disksFailed = false;
for (String dir : postCheckDirs) {
if (!preCheckFailedDirs.contains(dir)) {
disksFailed = true;
break;
}
}
return disksFailed;
}
private boolean disksTurnedGood(Set<String> preCheckDirs,
Set<String> postCheckDirs) {
boolean disksTurnedGood = false;
for (String dir : preCheckDirs) {
if (!postCheckDirs.contains(dir)) {
disksTurnedGood = true;
break;
}
}
return disksTurnedGood;
}
public Path getLocalPathForWrite(String pathStr) throws IOException {
return localDirsAllocator.getLocalPathForWrite(pathStr, getConfig());
}

View File

@ -55,9 +55,9 @@ public class NodeHealthCheckerService extends CompositeService {
String scriptReport = (nodeHealthScriptRunner == null) ? ""
: nodeHealthScriptRunner.getHealthReport();
if (scriptReport.equals("")) {
return dirsHandler.getDisksHealthReport();
return dirsHandler.getDisksHealthReport(false);
} else {
return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport());
return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport(false));
}
}

View File

@ -240,7 +240,7 @@ public class ContainerLaunch implements Callable<Integer> {
if (!dirsHandler.areDisksHealthy()) {
ret = ContainerExitStatus.DISKS_FAILED;
throw new IOException("Most of the disks failed. "
+ dirsHandler.getDisksHealthReport());
+ dirsHandler.getDisksHealthReport(false));
}
try {

View File

@ -55,11 +55,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
@ -171,6 +173,8 @@ public class ResourceLocalizationService extends CompositeService
private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>();
FileContext lfs;
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
@ -219,32 +223,17 @@ public class ResourceLocalizationService extends CompositeService
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK));
if (!stateStore.canRecover() || stateStore.isNewlyCreated()) {
cleanUpLocalDir(lfs,delService);
if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) {
cleanUpLocalDirs(lfs, delService);
initializeLocalDirs(lfs);
initializeLogDirs(lfs);
}
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
// $local/usercache
Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
lfs.mkdir(userDir, null, true);
// $local/filecache
Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
lfs.mkdir(fileDir, null, true);
// $local/nmPrivate
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
}
List<String> logDirs = dirsHandler.getLogDirs();
for (String logDir : logDirs) {
lfs.mkdir(new Path(logDir), null, true);
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize LocalizationService", e);
} catch (Exception e) {
throw new YarnRuntimeException(
"Failed to initialize LocalizationService", e);
}
cacheTargetSize =
@ -497,22 +486,27 @@ public class ResourceLocalizationService extends CompositeService
String containerIDStr = c.toString();
String appIDStr = ConverterUtils.toString(
c.getContainerId().getApplicationAttemptId().getApplicationId());
for (String localDir : dirsHandler.getLocalDirs()) {
// Try deleting from good local dirs and full local dirs because a dir might
// have gone bad while the app was running(disk full). In addition
// a dir might have become good while the app was running.
// Check if the container dir exists and if it does, try to delete it
for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
// Delete the user-owned container-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
Path containerDir = new Path(appDir, containerIDStr);
delService.delete(userName, containerDir, new Path[] {});
submitDirForDeletion(userName, containerDir);
// Delete the nmPrivate container-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
Path containerSysDir = new Path(appSysDir, containerIDStr);
delService.delete(null, containerSysDir, new Path[] {});
submitDirForDeletion(null, containerSysDir);
}
dispatcher.getEventHandler().handle(
@ -520,6 +514,18 @@ public class ResourceLocalizationService extends CompositeService
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
}
private void submitDirForDeletion(String userName, Path dir) {
try {
lfs.getFileStatus(dir);
delService.delete(userName, dir, new Path[] {});
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
} catch (IOException ie) {
// ignore
return;
}
}
@SuppressWarnings({"unchecked"})
private void handleDestroyApplicationResources(Application application) {
@ -545,19 +551,22 @@ public class ResourceLocalizationService extends CompositeService
}
// Delete the application directories
for (String localDir : dirsHandler.getLocalDirs()) {
userName = application.getUser();
appIDStr = application.toString();
for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
// Delete the user-owned app-dir
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, appIDStr);
delService.delete(userName, appDir, new Path[] {});
submitDirForDeletion(userName, appDir);
// Delete the nmPrivate app-dir
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
delService.delete(null, appSysDir, new Path[] {});
submitDirForDeletion(null, appSysDir);
}
// TODO: decrement reference counts of all resources associated with this
@ -868,7 +877,7 @@ public class ResourceLocalizationService extends CompositeService
/**
* Find next resource to be given to a spawned localizer.
*
* @return
* @return the next resource to be localized
*/
private LocalResource findNextResource() {
synchronized (pending) {
@ -1071,8 +1080,8 @@ public class ResourceLocalizationService extends CompositeService
// 1) write credentials to private dir
writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
List<String> localDirs = getInitializedLocalDirs();
List<String> logDirs = getInitializedLogDirs();
if (dirsHandler.areDisksHealthy()) {
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
@ -1082,7 +1091,7 @@ public class ResourceLocalizationService extends CompositeService
localizerId, localDirs, logDirs);
} else {
throw new IOException("All disks failed. "
+ dirsHandler.getDisksHealthReport());
+ dirsHandler.getDisksHealthReport(false));
}
// TODO handle ExitCodeException separately?
} catch (Exception e) {
@ -1151,9 +1160,81 @@ public class ResourceLocalizationService extends CompositeService
}
private void cleanUpLocalDir(FileContext lfs, DeletionService del) {
long currentTimeStamp = System.currentTimeMillis();
private void initializeLocalDirs(FileContext lfs) {
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
initializeLocalDir(lfs, localDir);
}
}
private void initializeLocalDir(FileContext lfs, String localDir) {
Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
FileStatus status;
try {
status = lfs.getFileStatus(entry.getKey());
}
catch(FileNotFoundException fs) {
status = null;
}
catch(IOException ie) {
String msg = "Could not get file status for local dir " + entry.getKey();
LOG.warn(msg, ie);
throw new YarnRuntimeException(msg, ie);
}
if(status == null) {
try {
lfs.mkdir(entry.getKey(), entry.getValue(), true);
status = lfs.getFileStatus(entry.getKey());
} catch (IOException e) {
String msg = "Could not initialize local dir " + entry.getKey();
LOG.warn(msg, e);
throw new YarnRuntimeException(msg, e);
}
}
FsPermission perms = status.getPermission();
if(!perms.equals(entry.getValue())) {
try {
lfs.setPermission(entry.getKey(), entry.getValue());
}
catch(IOException ie) {
String msg = "Could not set permissions for local dir " + entry.getKey();
LOG.warn(msg, ie);
throw new YarnRuntimeException(msg, ie);
}
}
}
}
private void initializeLogDirs(FileContext lfs) {
List<String> logDirs = dirsHandler.getLogDirs();
for (String logDir : logDirs) {
initializeLogDir(lfs, logDir);
}
}
private void initializeLogDir(FileContext lfs, String logDir) {
try {
lfs.mkdir(new Path(logDir), null, true);
} catch (FileAlreadyExistsException fe) {
// do nothing
} catch (IOException e) {
String msg = "Could not initialize log dir " + logDir;
LOG.warn(msg, e);
throw new YarnRuntimeException(msg, e);
}
}
private void cleanUpLocalDirs(FileContext lfs, DeletionService del) {
for (String localDir : dirsHandler.getLocalDirs()) {
cleanUpLocalDir(lfs, del, localDir);
}
}
private void cleanUpLocalDir(FileContext lfs, DeletionService del,
String localDir) {
long currentTimeStamp = System.currentTimeMillis();
renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
currentTimeStamp);
renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
@ -1167,7 +1248,6 @@ public class ResourceLocalizationService extends CompositeService
LOG.warn("Failed to delete localDir: " + localDir);
}
}
}
private void renameLocalDir(FileContext lfs, String localDir,
String localSubDir, long currentTimeStamp) {
@ -1235,4 +1315,94 @@ public class ResourceLocalizationService extends CompositeService
}
}
/**
* Synchronized method to get a list of initialized local dirs. Method will
* check each local dir to ensure it has been setup correctly and will attempt
* to fix any issues it finds.
*
* @return list of initialized local dirs
*/
synchronized private List<String> getInitializedLocalDirs() {
List<String> dirs = dirsHandler.getLocalDirs();
List<String> checkFailedDirs = new ArrayList<String>();
for (String dir : dirs) {
try {
checkLocalDir(dir);
} catch (YarnRuntimeException e) {
checkFailedDirs.add(dir);
}
}
for (String dir : checkFailedDirs) {
LOG.info("Attempting to initialize " + dir);
initializeLocalDir(lfs, dir);
try {
checkLocalDir(dir);
} catch (YarnRuntimeException e) {
String msg =
"Failed to setup local dir " + dir + ", which was marked as good.";
LOG.warn(msg, e);
throw new YarnRuntimeException(msg, e);
}
}
return dirs;
}
private boolean checkLocalDir(String localDir) {
Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
FileStatus status;
try {
status = lfs.getFileStatus(entry.getKey());
} catch (Exception e) {
String msg =
"Could not carry out resource dir checks for " + localDir
+ ", which was marked as good";
LOG.warn(msg, e);
throw new YarnRuntimeException(msg, e);
}
if (!status.getPermission().equals(entry.getValue())) {
String msg =
"Permissions incorrectly set for dir " + entry.getKey()
+ ", should be " + entry.getValue() + ", actual value = "
+ status.getPermission();
LOG.warn(msg);
throw new YarnRuntimeException(msg);
}
}
return true;
}
private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir) {
Map<Path, FsPermission> localDirPathFsPermissionsMap = new HashMap<Path, FsPermission>();
FsPermission defaultPermission =
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
FsPermission nmPrivatePermission =
NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
localDirPathFsPermissionsMap.put(userDir, defaultPermission);
localDirPathFsPermissionsMap.put(fileDir, defaultPermission);
localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
return localDirPathFsPermissionsMap;
}
/**
* Synchronized method to get a list of initialized log dirs. Method will
* check each local dir to ensure it has been setup correctly and will attempt
* to fix any issues it finds.
*
* @return list of initialized log dirs
*/
synchronized private List<String> getInitializedLogDirs() {
List<String> dirs = dirsHandler.getLogDirs();
initializeLogDirs(lfs);
return dirs;
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@ -37,9 +38,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -107,6 +110,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final AtomicBoolean aborted = new AtomicBoolean();
private final Map<ApplicationAccessType, String> appAcls;
private final FileContext lfs;
private final LogAggregationContext logAggregationContext;
private final Context context;
private final int retentionSize;
@ -122,8 +126,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext,
Context context) {
LogAggregationContext logAggregationContext, Context context,
FileContext lfs) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
@ -136,6 +140,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls;
this.lfs = lfs;
this.logAggregationContext = logAggregationContext;
this.context = context;
this.nodeId = nodeId;
@ -395,15 +400,25 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
uploadLogsForContainers();
// Remove the local app-log-dirs
List<String> rootLogDirs = dirsHandler.getLogDirs();
Path[] localAppLogDirs = new Path[rootLogDirs.size()];
int index = 0;
for (String rootLogDir : rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
index++;
List<Path> localAppLogDirs = new ArrayList<Path>();
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
Path logPath = new Path(rootLogDir, applicationId);
try {
// check if log dir exists
lfs.getFileStatus(logPath);
localAppLogDirs.add(logPath);
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue);
continue;
} catch (IOException fe) {
continue;
}
}
if (localAppLogDirs.size() > 0) {
this.delService.delete(this.userUgi.getShortUserName(), null,
localAppLogDirs);
localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
}
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,

View File

@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -327,6 +328,15 @@ public class LogAggregationService extends AbstractService implements
this.dispatcher.getEventHandler().handle(eventResponse);
}
FileContext getLocalFileContext(Configuration conf) {
try {
return FileContext.getLocalFSFileContext(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access local fs");
}
}
protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls,
@ -344,7 +354,8 @@ public class LogAggregationService extends AbstractService implements
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
appAcls, logAggregationContext, this.context);
appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig()));
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId);
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -28,11 +30,14 @@ import java.util.concurrent.RejectedExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@ -97,6 +102,15 @@ public class NonAggregatingLogHandler extends AbstractService implements
super.serviceStop();
}
FileContext getLocalFileContext(Configuration conf) {
try {
return FileContext.getLocalFSFileContext(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Failed to access local fs");
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(LogHandlerEvent event) {
@ -160,21 +174,30 @@ public class NonAggregatingLogHandler extends AbstractService implements
@Override
@SuppressWarnings("unchecked")
public void run() {
List<String> rootLogDirs =
NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
Path[] localAppLogDirs = new Path[rootLogDirs.size()];
int index = 0;
for (String rootLogDir : rootLogDirs) {
localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
index++;
List<Path> localAppLogDirs = new ArrayList<Path>();
FileContext lfs = getLocalFileContext(getConfig());
for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
Path logDir = new Path(rootLogDir, applicationId.toString());
try {
lfs.getFileStatus(logDir);
localAppLogDirs.add(logDir);
} catch (UnsupportedFileSystemException ue) {
LOG.warn("Unsupported file system used for log dir " + logDir, ue);
continue;
} catch (IOException ie) {
continue;
}
}
// Inform the application before the actual delete itself, so that links
// to logs will no longer be there on NM web-UI.
NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.applicationId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
if (localAppLogDirs.size() > 0) {
NonAggregatingLogHandler.this.delService.delete(user, null,
localAppLogDirs);
(Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
}
}
@Override

View File

@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
public class TestDirectoryCollection {
@ -42,8 +44,13 @@ public class TestDirectoryCollection {
TestDirectoryCollection.class.getName()).getAbsoluteFile();
private static final File testFile = new File(testDir, "testfile");
private Configuration conf;
private FileContext localFs;
@Before
public void setup() throws IOException {
public void setupForTests() throws IOException {
conf = new Configuration();
localFs = FileContext.getLocalFSFileContext(conf);
testDir.mkdirs();
testFile.createNewFile();
}
@ -56,10 +63,11 @@ public class TestDirectoryCollection {
@Test
public void testConcurrentAccess() throws IOException {
// Initialize DirectoryCollection with a file instead of a directory
Configuration conf = new Configuration();
String[] dirs = {testFile.getPath()};
DirectoryCollection dc = new DirectoryCollection(dirs,
conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
DirectoryCollection dc =
new DirectoryCollection(dirs, conf.getFloat(
YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
// Create an iterator before checkDirs is called to reliable test case
@ -78,9 +86,8 @@ public class TestDirectoryCollection {
@Test
public void testCreateDirectories() throws IOException {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext localFs = FileContext.getLocalFSFileContext(conf);
String dirA = new File(testDir, "dirA").getPath();
String dirB = new File(dirA, "dirB").getPath();
@ -92,8 +99,9 @@ public class TestDirectoryCollection {
localFs.setPermission(pathC, permDirC);
String[] dirs = { dirA, dirB, dirC };
DirectoryCollection dc = new DirectoryCollection(dirs,
conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
DirectoryCollection dc =
new DirectoryCollection(dirs, conf.getFloat(
YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
FsPermission defaultPerm = FsPermission.getDefault()
.applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
@ -120,25 +128,29 @@ public class TestDirectoryCollection {
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
Assert.assertEquals(1, dc.getFullDirs().size());
dc = new DirectoryCollection(dirs, 100.0F);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
Assert.assertEquals(0, dc.getFullDirs().size());
dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024));
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
Assert.assertEquals(1, dc.getFullDirs().size());
dc = new DirectoryCollection(dirs, 100.0F, 0);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
Assert.assertEquals(0, dc.getFullDirs().size());
}
@Test
public void testDiskLimitsCutoffSetters() {
public void testDiskLimitsCutoffSetters() throws IOException {
String[] dirs = { "dir" };
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100);
@ -162,6 +174,47 @@ public class TestDirectoryCollection {
Assert.assertEquals(0, dc.getDiskUtilizationSpaceCutoff());
}
@Test
public void testFailedDisksBecomingGoodAgain() throws Exception {
String dirA = new File(testDir, "dirA").getPath();
String[] dirs = { dirA };
DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
Assert.assertEquals(1, dc.getFullDirs().size());
dc.setDiskUtilizationPercentageCutoff(100.0F);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
Assert.assertEquals(0, dc.getFullDirs().size());
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
String dirB = new File(testDir, "dirB").getPath();
Path pathB = new Path(dirB);
FsPermission permDirB = new FsPermission((short) 0400);
localFs.mkdir(pathB, null, true);
localFs.setPermission(pathB, permDirB);
String[] dirs2 = { dirB };
dc = new DirectoryCollection(dirs2, 100.0F);
dc.checkDirs();
Assert.assertEquals(0, dc.getGoodDirs().size());
Assert.assertEquals(1, dc.getFailedDirs().size());
Assert.assertEquals(0, dc.getFullDirs().size());
permDirB = new FsPermission((short) 0700);
localFs.setPermission(pathB, permDirB);
dc.checkDirs();
Assert.assertEquals(1, dc.getGoodDirs().size());
Assert.assertEquals(0, dc.getFailedDirs().size());
Assert.assertEquals(0, dc.getFullDirs().size());
}
@Test
public void testConstructors() {

View File

@ -21,8 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -57,10 +62,11 @@ public class TestLocalDirsHandlerService {
LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
dirSvc.init(conf);
Assert.assertEquals(1, dirSvc.getLocalDirs().size());
dirSvc.close();
}
@Test
public void testValidPathsDirHandlerService() {
public void testValidPathsDirHandlerService() throws Exception {
Configuration conf = new YarnConfiguration();
String localDir1 = new File("file:///" + testDir, "localDir1").getPath();
String localDir2 = new File("hdfs:///" + testDir, "localDir2").getPath();
@ -76,5 +82,40 @@ public class TestLocalDirsHandlerService {
Assert.assertEquals("Service should not be inited",
STATE.STOPPED,
dirSvc.getServiceState());
dirSvc.close();
}
@Test
public void testGetFullDirs() throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext localFs = FileContext.getLocalFSFileContext(conf);
String localDir1 = new File(testDir, "localDir1").getPath();
String localDir2 = new File(testDir, "localDir2").getPath();
String logDir1 = new File(testDir, "logDir1").getPath();
String logDir2 = new File(testDir, "logDir2").getPath();
Path localDir1Path = new Path(localDir1);
Path logDir1Path = new Path(logDir1);
FsPermission dirPermissions = new FsPermission((short) 0410);
localFs.mkdir(localDir1Path, dirPermissions, true);
localFs.mkdir(logDir1Path, dirPermissions, true);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2);
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1 + "," + logDir2);
conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
0.0f);
LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
dirSvc.init(conf);
Assert.assertEquals(0, dirSvc.getLocalDirs().size());
Assert.assertEquals(0, dirSvc.getLogDirs().size());
Assert.assertEquals(1, dirSvc.getDiskFullLocalDirs().size());
Assert.assertEquals(1, dirSvc.getDiskFullLogDirs().size());
FileUtils.deleteDirectory(new File(localDir1));
FileUtils.deleteDirectory(new File(localDir2));
FileUtils.deleteDirectory(new File(logDir1));
FileUtils.deleteDirectory(new File(logDir1));
dirSvc.close();
}
}

View File

@ -196,7 +196,7 @@ public class TestNodeHealthService {
healthStatus.getHealthReport().equals(
NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG
+ NodeHealthCheckerService.SEPARATOR
+ nodeHealthChecker.getDiskHandler().getDisksHealthReport()));
+ nodeHealthChecker.getDiskHandler().getDisksHealthReport(false)));
}
}

View File

@ -42,6 +42,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -68,13 +69,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.security.AccessControlException;
import org.junit.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@ -167,15 +171,15 @@ public class TestResourceLocalizationService {
conf = new Configuration();
spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
lfs = FileContext.getFileContext(spylfs, conf);
doNothing().when(spylfs).mkdir(
isA(Path.class), isA(FsPermission.class), anyBoolean());
String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
}
@After
public void cleanup() {
public void cleanup() throws IOException {
conf = null;
FileUtils.deleteDirectory(new File(basedir.toString()));
}
@Test
@ -752,6 +756,39 @@ public class TestResourceLocalizationService {
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
FsPermission defaultPermission =
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
FsPermission nmPermission =
ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
final Path userDir =
new Path(sDirs[0].substring("file:".length()),
ContainerLocalizer.USERCACHE);
final Path fileDir =
new Path(sDirs[0].substring("file:".length()),
ContainerLocalizer.FILECACHE);
final Path sysDir =
new Path(sDirs[0].substring("file:".length()),
ResourceLocalizationService.NM_PRIVATE_DIR);
final FileStatus fs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
defaultPermission, "", "", new Path(sDirs[0]));
final FileStatus nmFs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
nmPermission, "", "", sysDir);
doAnswer(new Answer<FileStatus>() {
@Override
public FileStatus answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
if (args.length > 0) {
if (args[0].equals(userDir) || args[0].equals(fileDir)) {
return fs;
}
}
return nmFs;
}
}).when(spylfs).getFileStatus(isA(Path.class));
try {
spyService.init(conf);
spyService.start();
@ -1776,4 +1813,273 @@ public class TestResourceLocalizationService {
new Text("kind" + id), new Text("service" + id));
}
/*
* Test to ensure ResourceLocalizationService can handle local dirs going bad.
* Test first sets up all the components required, then sends events to fetch
* a private, app and public resource. It then sends events to clean up the
* container and the app and ensures the right delete calls were made.
*/
@Test
@SuppressWarnings("unchecked")
// mocked generics
public void testFailedDirsResourceRelease() throws Exception {
// setup components
File f = new File(basedir.toString());
String[] sDirs = new String[4];
List<Path> localDirs = new ArrayList<Path>(sDirs.length);
for (int i = 0; i < 4; ++i) {
sDirs[i] = f.getAbsolutePath() + i;
localDirs.add(new Path(sDirs[i]));
}
List<Path> containerLocalDirs = new ArrayList<Path>(localDirs.size());
List<Path> appLocalDirs = new ArrayList<Path>(localDirs.size());
List<Path> nmLocalContainerDirs = new ArrayList<Path>(localDirs.size());
List<Path> nmLocalAppDirs = new ArrayList<Path>(localDirs.size());
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(conf);
dispatcher.start();
EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, applicationBus);
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
dispatcher.register(ContainerEventType.class, containerBus);
// Ignore actual localization
EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
dispatcher.register(LocalizerEventType.class, localizerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
LocalDirsHandlerService mockDirsHandler =
mock(LocalDirsHandlerService.class);
doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
mockDirsHandler).getLocalDirsForCleanup();
DeletionService delService = mock(DeletionService.class);
// setup mocks
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
mockDirsHandler, new NMNullStateStoreService());
ResourceLocalizationService spyService = spy(rawService);
doReturn(mockServer).when(spyService).createServer();
doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
isA(Configuration.class));
doReturn(lfs).when(spyService)
.getLocalFileContext(isA(Configuration.class));
FsPermission defaultPermission =
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
FsPermission nmPermission =
ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
final FileStatus fs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
defaultPermission, "", "", localDirs.get(0));
final FileStatus nmFs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
nmPermission, "", "", localDirs.get(0));
final String user = "user0";
// init application
final Application app = mock(Application.class);
final ApplicationId appId =
BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn(user);
when(app.getAppId()).thenReturn(appId);
when(app.toString()).thenReturn(ConverterUtils.toString(appId));
// init container.
final Container c = getMockContainer(appId, 42, user);
// setup local app dirs
List<String> tmpDirs = mockDirsHandler.getLocalDirs();
for (int i = 0; i < tmpDirs.size(); ++i) {
Path usersdir = new Path(tmpDirs.get(i), ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, user);
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
Path appDir = new Path(allAppsdir, ConverterUtils.toString(appId));
Path containerDir =
new Path(appDir, ConverterUtils.toString(c.getContainerId()));
containerLocalDirs.add(containerDir);
appLocalDirs.add(appDir);
Path sysDir =
new Path(tmpDirs.get(i), ResourceLocalizationService.NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, ConverterUtils.toString(appId));
Path containerSysDir =
new Path(appSysDir, ConverterUtils.toString(c.getContainerId()));
nmLocalContainerDirs.add(containerSysDir);
nmLocalAppDirs.add(appSysDir);
}
try {
spyService.init(conf);
spyService.start();
spyService.handle(new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
dispatcher.await();
// Get a handle on the trackers after they're setup with
// INIT_APP_RESOURCES
LocalResourcesTracker appTracker =
spyService.getLocalResourcesTracker(
LocalResourceVisibility.APPLICATION, user, appId);
LocalResourcesTracker privTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
user, appId);
LocalResourcesTracker pubTracker =
spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
user, appId);
// init resources
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
// Send localization requests, one for each type of resource
final LocalResource privResource = getPrivateMockedResource(r);
final LocalResourceRequest privReq =
new LocalResourceRequest(privResource);
final LocalResource appResource = getAppMockedResource(r);
final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
final LocalResource pubResource = getPublicMockedResource(r);
final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
req.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
req.put(LocalResourceVisibility.APPLICATION,
Collections.singletonList(appReq));
req
.put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
req2.put(LocalResourceVisibility.PRIVATE,
Collections.singletonList(privReq));
// Send Request event
spyService.handle(new ContainerLocalizationRequestEvent(c, req));
spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
dispatcher.await();
int privRsrcCount = 0;
for (LocalizedResource lr : privTracker) {
privRsrcCount++;
Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
Assert.assertEquals(privReq, lr.getRequest());
}
Assert.assertEquals(1, privRsrcCount);
int appRsrcCount = 0;
for (LocalizedResource lr : appTracker) {
appRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(appReq, lr.getRequest());
}
Assert.assertEquals(1, appRsrcCount);
int pubRsrcCount = 0;
for (LocalizedResource lr : pubTracker) {
pubRsrcCount++;
Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
Assert.assertEquals(pubReq, lr.getRequest());
}
Assert.assertEquals(1, pubRsrcCount);
// setup mocks for test, a set of dirs with IOExceptions and let the rest
// go through
for (int i = 0; i < containerLocalDirs.size(); ++i) {
if (i == 2) {
Mockito.doThrow(new IOException()).when(spylfs)
.getFileStatus(eq(containerLocalDirs.get(i)));
Mockito.doThrow(new IOException()).when(spylfs)
.getFileStatus(eq(nmLocalContainerDirs.get(i)));
} else {
doReturn(fs).when(spylfs)
.getFileStatus(eq(containerLocalDirs.get(i)));
doReturn(nmFs).when(spylfs).getFileStatus(
eq(nmLocalContainerDirs.get(i)));
}
}
// Send Cleanup Event
spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
verify(mockLocallilzerTracker).cleanupPrivLocalizers(
"container_314159265358979_0003_01_000042");
// match cleanup events with the mocks we setup earlier
for (int i = 0; i < containerLocalDirs.size(); ++i) {
if (i == 2) {
try {
verify(delService).delete(user, containerLocalDirs.get(i));
verify(delService).delete(null, nmLocalContainerDirs.get(i));
Assert.fail("deletion attempts for invalid dirs");
} catch (Throwable e) {
continue;
}
} else {
verify(delService).delete(user, containerLocalDirs.get(i));
verify(delService).delete(null, nmLocalContainerDirs.get(i));
}
}
ArgumentMatcher<ApplicationEvent> matchesAppDestroy =
new ArgumentMatcher<ApplicationEvent>() {
@Override
public boolean matches(Object o) {
ApplicationEvent evt = (ApplicationEvent) o;
return (evt.getType() == ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)
&& appId == evt.getApplicationID();
}
};
dispatcher.await();
// setup mocks again, this time throw UnsupportedFileSystemException and
// IOExceptions
for (int i = 0; i < containerLocalDirs.size(); ++i) {
if (i == 3) {
Mockito.doThrow(new IOException()).when(spylfs)
.getFileStatus(eq(appLocalDirs.get(i)));
Mockito.doThrow(new UnsupportedFileSystemException("test"))
.when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
} else {
doReturn(fs).when(spylfs).getFileStatus(eq(appLocalDirs.get(i)));
doReturn(nmFs).when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
}
}
LocalizationEvent destroyApp =
new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app);
spyService.handle(destroyApp);
verify(applicationBus).handle(argThat(matchesAppDestroy));
// verify we got the right delete calls
for (int i = 0; i < containerLocalDirs.size(); ++i) {
if (i == 3) {
try {
verify(delService).delete(user, containerLocalDirs.get(i));
verify(delService).delete(null, nmLocalContainerDirs.get(i));
Assert.fail("deletion attempts for invalid dirs");
} catch (Throwable e) {
continue;
}
} else {
verify(delService).delete(user, appLocalDirs.get(i));
verify(delService).delete(null, nmLocalAppDirs.get(i));
}
}
} finally {
dispatcher.stop();
delService.stop();
}
}
}

View File

@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -58,6 +59,8 @@ import org.junit.Assert;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -105,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerM
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@ -137,11 +141,18 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.remoteRootLogDir.mkdir();
}
DrainDispatcher dispatcher;
EventHandler<ApplicationEvent> appEventHandler;
@Override
@SuppressWarnings("unchecked")
public void setup() throws IOException {
super.setup();
NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
((NMContext)context).setNodeId(nodeId);
dispatcher = createDispatcher();
appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
}
@Override
@ -149,10 +160,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
super.tearDown();
createContainerExecutor().deleteAsUser(user,
new Path(remoteRootLogDir.getAbsolutePath()), new Path[] {});
dispatcher.await();
dispatcher.stop();
dispatcher.close();
}
@Test
@SuppressWarnings("unchecked")
public void testLocalFileDeletionAfterUpload() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());
delSrvc = spy(delSrvc);
@ -161,10 +174,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
@ -236,16 +245,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test
@SuppressWarnings("unchecked")
public void testNoContainerOnNode() throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
@ -285,6 +289,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
};
checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
dispatcher.stop();
logAggregationService.close();
}
@Test
@ -294,6 +299,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
String[] fileNames = new String[] { "stdout", "stderr", "syslog" };
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
@ -432,17 +438,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test
@SuppressWarnings("unchecked")
public void testVerifyAndCreateRemoteDirsFailure()
throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
@ -456,8 +457,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.start();
// Now try to start an application
ApplicationId appId = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000));
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
@ -475,8 +477,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Mockito.reset(logAggregationService);
// Now try to start another one
ApplicationId appId2 = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
ApplicationId appId2 =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000));
File appLogDir =
new File(localLogDir, ConverterUtils.toString(appId2));
appLogDir.mkdir();
@ -578,6 +581,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
aggSvc.stop();
aggSvc.close();
}
@Test
@ -589,18 +594,15 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId appId = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000));
doThrow(new YarnRuntimeException("KABOOM!"))
.when(logAggregationService).initAppAggregator(
eq(appId), eq(user), any(Credentials.class),
@ -634,7 +636,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test
@SuppressWarnings("unchecked")
public void testLogAggregationCreateDirsFailsWithoutKillingNM()
throws Exception {
@ -642,18 +643,15 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId appId = BuilderUtils.newApplicationId(
System.currentTimeMillis(), (int)Math.random());
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000));
Exception e = new RuntimeException("KABOOM!");
doThrow(e)
.when(logAggregationService).createAppDir(any(String.class),
@ -905,7 +903,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test(timeout=20000)
@SuppressWarnings("unchecked")
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
@ -913,10 +910,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
when(mockedDirSvc.getLogDirs()).thenThrow(new RuntimeException());
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
@ -930,20 +923,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
logAggregationService.close();
}
@Test
@SuppressWarnings("unchecked")
public void testLogAggregatorCleanup() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);
// get the AppLogAggregationImpl thread to crash
LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, delSrvc,
mockedDirSvc);
@ -964,6 +953,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
Assert.assertEquals("Log aggregator failed to cleanup!", 0,
logAggregationService.getNumAggregators());
logAggregationService.stop();
logAggregationService.close();
}
@SuppressWarnings("unchecked")
@ -1039,6 +1030,72 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
return sb.toString();
}
/*
* Test to make sure we handle cases where the directories we get back from
* the LocalDirsHandler may have issues including the log dir not being
* present as well as other issues. The test uses helper functions from
* TestNonAggregatingLogHandler.
*/
@Test
public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception {
// setup conf and services
DeletionService mockDelService = mock(DeletionService.class);
File[] localLogDirs =
TestNonAggregatingLogHandler.getLocalLogDirFiles(this.getClass()
.getName(), 7);
final List<String> localLogDirPaths =
new ArrayList<String>(localLogDirs.length);
for (int i = 0; i < localLogDirs.length; i++) {
localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
}
String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(application1, 1);
this.dirsHandler = new LocalDirsHandlerService();
LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
LogAggregationService logAggregationService =
spy(new LogAggregationService(dispatcher, this.context, mockDelService,
mockDirsHandler));
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
FileContext lfs = FileContext.getFileContext(spylfs, conf);
doReturn(lfs).when(logAggregationService).getLocalFileContext(
isA(Configuration.class));
logAggregationService.init(this.conf);
logAggregationService.start();
TestNonAggregatingLogHandler.runMockedFailedDirs(logAggregationService,
application1, user, mockDelService, mockDirsHandler, conf, spylfs, lfs,
localLogDirs);
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
ApplicationEvent expectedEvents[] =
new ApplicationEvent[] {
new ApplicationEvent(appAttemptId.getApplicationId(),
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
new ApplicationEvent(appAttemptId.getApplicationId(),
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
checkEvents(appEventHandler, expectedEvents, true, "getType",
"getApplicationID");
}
@Test (timeout = 50000)
@SuppressWarnings("unchecked")
public void testLogAggregationServiceWithPatterns() throws Exception {

View File

@ -19,15 +19,36 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -45,25 +66,52 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.mockito.exceptions.verification.WantedButNotInvoked;
import org.mockito.internal.matchers.VarargMatcher;
public class TestNonAggregatingLogHandler {
@Test
@SuppressWarnings("unchecked")
public void testLogDeletion() {
DeletionService delService = mock(DeletionService.class);
Configuration conf = new YarnConfiguration();
DeletionService mockDelService;
Configuration conf;
DrainDispatcher dispatcher;
EventHandler<ApplicationEvent> appEventHandler;
String user = "testuser";
ApplicationId appId;
ApplicationAttemptId appAttemptId;
ContainerId container11;
LocalDirsHandlerService dirsHandler;
File[] localLogDirs = new File[2];
localLogDirs[0] =
new File("target", this.getClass().getName() + "-localLogDir0")
.getAbsoluteFile();
localLogDirs[1] =
new File("target", this.getClass().getName() + "-localLogDir1")
.getAbsoluteFile();
@Before
@SuppressWarnings("unchecked")
public void setup() {
mockDelService = mock(DeletionService.class);
conf = new YarnConfiguration();
dispatcher = createDispatcher(conf);
appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
appId = BuilderUtils.newApplicationId(1234, 1);
appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
container11 = BuilderUtils.newContainerId(appAttemptId, 1);
dirsHandler = new LocalDirsHandlerService();
}
@After
public void tearDown() throws IOException {
dirsHandler.stop();
dirsHandler.close();
dispatcher.await();
dispatcher.stop();
dispatcher.close();
}
@Test
public void testLogDeletion() throws IOException {
File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
String localLogDirsString =
localLogDirs[0].getAbsolutePath() + ","
+ localLogDirs[1].getAbsolutePath();
@ -72,72 +120,50 @@ public class TestNonAggregatingLogHandler {
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
DrainDispatcher dispatcher = createDispatcher(conf);
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler rawLogHandler =
new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler);
NonAggregatingLogHandler logHandler = spy(rawLogHandler);
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
FileContext lfs = FileContext.getFileContext(spylfs, conf);
doReturn(lfs).when(logHandler)
.getLocalFileContext(isA(Configuration.class));
FsPermission defaultPermission =
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
final FileStatus fs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
defaultPermission, "", "",
new Path(localLogDirs[0].getAbsolutePath()));
doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
NonAggregatingLogHandler logHandler =
new NonAggregatingLogHandler(dispatcher, delService, dirsHandler);
logHandler.init(conf);
logHandler.start();
logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
logHandler.handle(new LogHandlerAppFinishedEvent(appId));
Path[] localAppLogDirs = new Path[2];
localAppLogDirs[0] =
new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
localAppLogDirs[1] =
new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
// 5 seconds for the delete which is a separate thread.
long verifyStartTime = System.currentTimeMillis();
WantedButNotInvoked notInvokedException = null;
boolean matched = false;
while (!matched && System.currentTimeMillis() < verifyStartTime + 5000l) {
try {
verify(delService).delete(eq(user), (Path) eq(null),
eq(localAppLogDirs[0]), eq(localAppLogDirs[1]));
matched = true;
} catch (WantedButNotInvoked e) {
notInvokedException = e;
try {
Thread.sleep(50l);
} catch (InterruptedException i) {
}
}
}
if (!matched) {
throw notInvokedException;
testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirs);
logHandler.close();
for (int i = 0; i < localLogDirs.length; i++) {
FileUtils.deleteDirectory(localLogDirs[i]);
}
}
@Test
@SuppressWarnings("unchecked")
public void testDelayedDelete() {
DeletionService delService = mock(DeletionService.class);
Configuration conf = new YarnConfiguration();
String user = "testuser";
File[] localLogDirs = new File[2];
localLogDirs[0] =
new File("target", this.getClass().getName() + "-localLogDir0")
.getAbsoluteFile();
localLogDirs[1] =
new File("target", this.getClass().getName() + "-localLogDir1")
.getAbsoluteFile();
public void testDelayedDelete() throws IOException {
File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
String localLogDirsString =
localLogDirs[0].getAbsolutePath() + ","
+ localLogDirs[1].getAbsolutePath();
@ -148,42 +174,36 @@ public class TestNonAggregatingLogHandler {
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
DrainDispatcher dispatcher = createDispatcher(conf);
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
ApplicationId appId1 = BuilderUtils.newApplicationId(1234, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(appId1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
NonAggregatingLogHandler logHandler =
new NonAggregatingLogHandlerWithMockExecutor(dispatcher, delService,
new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService,
dirsHandler);
logHandler.init(conf);
logHandler.start();
logHandler.handle(new LogHandlerAppStartedEvent(appId1, user, null,
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
logHandler.handle(new LogHandlerAppFinishedEvent(appId1));
logHandler.handle(new LogHandlerAppFinishedEvent(appId));
Path[] localAppLogDirs = new Path[2];
localAppLogDirs[0] =
new Path(localLogDirs[0].getAbsolutePath(), appId1.toString());
new Path(localLogDirs[0].getAbsolutePath(), appId.toString());
localAppLogDirs[1] =
new Path(localLogDirs[1].getAbsolutePath(), appId1.toString());
new Path(localLogDirs[1].getAbsolutePath(), appId.toString());
ScheduledThreadPoolExecutor mockSched =
((NonAggregatingLogHandlerWithMockExecutor) logHandler).mockSched;
verify(mockSched).schedule(any(Runnable.class), eq(10800l),
eq(TimeUnit.SECONDS));
logHandler.close();
for (int i = 0; i < localLogDirs.length; i++) {
FileUtils.deleteDirectory(localLogDirs[i]);
}
}
@Test
@ -202,25 +222,25 @@ public class TestNonAggregatingLogHandler {
verify(logHandler.mockSched)
.awaitTermination(eq(10l), eq(TimeUnit.SECONDS));
verify(logHandler.mockSched).shutdownNow();
logHandler.close();
aggregatingLogHandler.close();
}
@Test
public void testHandlingApplicationFinishedEvent() {
Configuration conf = new Configuration();
LocalDirsHandlerService dirsService = new LocalDirsHandlerService();
public void testHandlingApplicationFinishedEvent() throws IOException {
DeletionService delService = new DeletionService(null);
NonAggregatingLogHandler aggregatingLogHandler =
new NonAggregatingLogHandler(new InlineDispatcher(),
delService,
dirsService);
dirsHandler);
dirsService.init(conf);
dirsService.start();
dirsHandler.init(conf);
dirsHandler.start();
delService.init(conf);
delService.start();
aggregatingLogHandler.init(conf);
aggregatingLogHandler.start();
ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
// It should NOT throw RejectedExecutionException
aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
aggregatingLogHandler.stop();
@ -228,6 +248,7 @@ public class TestNonAggregatingLogHandler {
// It should NOT throw RejectedExecutionException after stopping
// handler service.
aggregatingLogHandler.handle(new LogHandlerAppFinishedEvent(appId));
aggregatingLogHandler.close();
}
private class NonAggregatingLogHandlerWithMockExecutor extends
@ -255,4 +276,201 @@ public class TestNonAggregatingLogHandler {
dispatcher.start();
return dispatcher;
}
/*
* Test to ensure that we handle the cleanup of directories that may not have
* the application log dirs we're trying to delete or may have other problems.
* Test creates 7 log dirs, and fails the directory check for 4 of them and
* then checks to ensure we tried to delete only the ones that passed the
* check.
*/
@Test
public void testFailedDirLogDeletion() throws Exception {
File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 7);
final List<String> localLogDirPaths =
new ArrayList<String>(localLogDirs.length);
for (int i = 0; i < localLogDirs.length; i++) {
localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
}
String localLogDirsString = StringUtils.join(localLogDirPaths, ",");
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
NonAggregatingLogHandler rawLogHandler =
new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler);
NonAggregatingLogHandler logHandler = spy(rawLogHandler);
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
FileContext lfs = FileContext.getFileContext(spylfs, conf);
doReturn(lfs).when(logHandler)
.getLocalFileContext(isA(Configuration.class));
logHandler.init(conf);
logHandler.start();
runMockedFailedDirs(logHandler, appId, user, mockDelService,
mockDirsHandler, conf, spylfs, lfs, localLogDirs);
logHandler.close();
}
/**
* Function to run a log handler with directories failing the getFileStatus
* call. The function accepts the log handler, setup the mocks to fail with
* specific exceptions and ensures the deletion service has the correct calls.
*
* @param logHandler the logHandler implementation to test
*
* @param appId the application id that we wish when sending events to the log
* handler
*
* @param user the user name to use
*
* @param mockDelService a mock of the DeletionService which we will verify
* the delete calls against
*
* @param dirsHandler a spy or mock on the LocalDirsHandler service used to
* when creating the logHandler. It needs to be a spy so that we can intercept
* the getAllLogDirs() call.
*
* @param conf the configuration used
*
* @param spylfs a spy on the AbstractFileSystem object used when creating lfs
*
* @param lfs the FileContext object to be used to mock the getFileStatus()
* calls
*
* @param localLogDirs list of the log dirs to run the test against, must have
* at least 7 entries
*/
public static void runMockedFailedDirs(LogHandler logHandler,
ApplicationId appId, String user, DeletionService mockDelService,
LocalDirsHandlerService dirsHandler, Configuration conf,
AbstractFileSystem spylfs, FileContext lfs, File[] localLogDirs)
throws Exception {
Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
if (localLogDirs.length < 7) {
throw new IllegalArgumentException(
"Argument localLogDirs must be at least of length 7");
}
Path[] localAppLogDirPaths = new Path[localLogDirs.length];
for (int i = 0; i < localAppLogDirPaths.length; i++) {
localAppLogDirPaths[i] =
new Path(localLogDirs[i].getAbsolutePath(), appId.toString());
}
final List<String> localLogDirPaths =
new ArrayList<String>(localLogDirs.length);
for (int i = 0; i < localLogDirs.length; i++) {
localLogDirPaths.add(localLogDirs[i].getAbsolutePath());
}
// setup mocks
FsPermission defaultPermission =
FsPermission.getDirDefault().applyUMask(lfs.getUMask());
final FileStatus fs =
new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
defaultPermission, "", "",
new Path(localLogDirs[0].getAbsolutePath()));
doReturn(fs).when(spylfs).getFileStatus(isA(Path.class));
doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup();
logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls));
// test case where some dirs have the log dir to delete
// mock some dirs throwing various exceptions
// verify deletion happens only on the others
Mockito.doThrow(new FileNotFoundException()).when(spylfs)
.getFileStatus(eq(localAppLogDirPaths[0]));
doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[1]));
Mockito.doThrow(new AccessControlException()).when(spylfs)
.getFileStatus(eq(localAppLogDirPaths[2]));
doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[3]));
Mockito.doThrow(new IOException()).when(spylfs)
.getFileStatus(eq(localAppLogDirPaths[4]));
Mockito.doThrow(new UnsupportedFileSystemException("test")).when(spylfs)
.getFileStatus(eq(localAppLogDirPaths[5]));
doReturn(fs).when(spylfs).getFileStatus(eq(localAppLogDirPaths[6]));
logHandler.handle(new LogHandlerAppFinishedEvent(appId));
testDeletionServiceCall(mockDelService, user, 5000, localAppLogDirPaths[1],
localAppLogDirPaths[3], localAppLogDirPaths[6]);
return;
}
static class DeletePathsMatcher extends ArgumentMatcher<Path[]> implements
VarargMatcher {
// to get rid of serialization warning
static final long serialVersionUID = 0;
private transient Path[] matchPaths;
DeletePathsMatcher(Path... matchPaths) {
this.matchPaths = matchPaths;
}
@Override
public boolean matches(Object varargs) {
return new EqualsBuilder().append(matchPaths, varargs).isEquals();
}
// function to get rid of FindBugs warning
private void readObject(ObjectInputStream os) throws NotSerializableException {
throw new NotSerializableException(this.getClass().getName());
}
}
/**
* Function to verify that the DeletionService object received the right
* requests.
*
* @param delService the DeletionService mock which we verify against
*
* @param user the user name to use when verifying the deletion
*
* @param timeout amount in milliseconds to wait before we decide the calls
* didn't come through
*
* @param matchPaths the paths to match in the delete calls
*
* @throws WantedButNotInvoked if the calls could not be verified
*/
static void testDeletionServiceCall(DeletionService delService, String user,
long timeout, Path... matchPaths) {
long verifyStartTime = System.currentTimeMillis();
WantedButNotInvoked notInvokedException = null;
boolean matched = false;
while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) {
try {
verify(delService).delete(eq(user), (Path) eq(null),
Mockito.argThat(new DeletePathsMatcher(matchPaths)));
matched = true;
} catch (WantedButNotInvoked e) {
notInvokedException = e;
try {
Thread.sleep(50l);
} catch (InterruptedException i) {
}
}
}
if (!matched) {
throw notInvokedException;
}
return;
}
public static File[] getLocalLogDirFiles(String name, int number) {
File[] dirs = new File[number];
for (int i = 0; i < dirs.length; i++) {
dirs[i] = new File("target", name + "-localLogDir" + i).getAbsoluteFile();
}
return dirs;
}
}