YARN-661. Fixed NM to cleanup users' local directories correctly when starting up. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1503942 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1503943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-07-16 23:31:29 +00:00
parent 41c05101e7
commit 14adc33eb4
5 changed files with 397 additions and 83 deletions

View File

@ -45,6 +45,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-523. Modified a test-case to validate container diagnostics on YARN-523. Modified a test-case to validate container diagnostics on
localization failures. (Jian He via vinodkv) localization failures. (Jian He via vinodkv)
YARN-661. Fixed NM to cleanup users' local directories correctly when
starting up. (Omkar Vinit Joshi via vinodkv)
Release 2.1.0-beta - 2013-07-02 Release 2.1.0-beta - 2013-07-02
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,23 +18,30 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.commons.logging.Log; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.LogFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DeletionService extends AbstractService { public class DeletionService extends AbstractService {
@ -42,7 +49,8 @@ public class DeletionService extends AbstractService {
private int debugDelay; private int debugDelay;
private final ContainerExecutor exec; private final ContainerExecutor exec;
private ScheduledThreadPoolExecutor sched; private ScheduledThreadPoolExecutor sched;
private final FileContext lfs = getLfs(); private static final FileContext lfs = getLfs();
static final FileContext getLfs() { static final FileContext getLfs() {
try { try {
return FileContext.getLocalFSFileContext(); return FileContext.getLocalFSFileContext();
@ -68,8 +76,20 @@ public class DeletionService extends AbstractService {
public void delete(String user, Path subDir, Path... baseDirs) { public void delete(String user, Path subDir, Path... baseDirs) {
// TODO if parent owned by NM, rename within parent inline // TODO if parent owned by NM, rename within parent inline
if (debugDelay != -1) { if (debugDelay != -1) {
sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay, if (baseDirs == null || baseDirs.length == 0) {
TimeUnit.SECONDS); sched.schedule(new FileDeletionTask(this, user, subDir, null),
debugDelay, TimeUnit.SECONDS);
} else {
sched.schedule(
new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
debugDelay, TimeUnit.SECONDS);
}
}
}
public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
if (debugDelay != -1) {
sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
} }
} }
@ -118,46 +138,184 @@ public class DeletionService extends AbstractService {
return getServiceState() == STATE.STOPPED && sched.isTerminated(); return getServiceState() == STATE.STOPPED && sched.isTerminated();
} }
private class FileDeletion implements Runnable { public static class FileDeletionTask implements Runnable {
final String user; private final String user;
final Path subDir; private final Path subDir;
final Path[] baseDirs; private final List<Path> baseDirs;
FileDeletion(String user, Path subDir, Path[] baseDirs) { private final AtomicInteger numberOfPendingPredecessorTasks;
private final Set<FileDeletionTask> successorTaskSet;
private final DeletionService delService;
// By default all tasks will start as success=true; however if any of
// the dependent task fails then it will be marked as false in
// fileDeletionTaskFinished().
private boolean success;
private FileDeletionTask(DeletionService delService, String user,
Path subDir, List<Path> baseDirs) {
this.delService = delService;
this.user = user; this.user = user;
this.subDir = subDir; this.subDir = subDir;
this.baseDirs = baseDirs; this.baseDirs = baseDirs;
this.successorTaskSet = new HashSet<FileDeletionTask>();
this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
success = true;
} }
/**
* increments and returns pending predecessor task count
*/
public int incrementAndGetPendingPredecessorTasks() {
return numberOfPendingPredecessorTasks.incrementAndGet();
}
/**
* decrements and returns pending predecessor task count
*/
public int decrementAndGetPendingPredecessorTasks() {
return numberOfPendingPredecessorTasks.decrementAndGet();
}
@VisibleForTesting
public String getUser() {
return this.user;
}
@VisibleForTesting
public Path getSubDir() {
return this.subDir;
}
@VisibleForTesting
public List<Path> getBaseDirs() {
return this.baseDirs;
}
public synchronized void setSuccess(boolean success) {
this.success = success;
}
public synchronized boolean getSucess() {
return this.success;
}
@Override @Override
public void run() { public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug(this);
}
boolean error = false;
if (null == user) { if (null == user) {
if (baseDirs == null || baseDirs.length == 0) { if (baseDirs == null || baseDirs.size() == 0) {
LOG.debug("NM deleting absolute path : " + subDir); LOG.debug("NM deleting absolute path : " + subDir);
try { try {
lfs.delete(subDir, true); lfs.delete(subDir, true);
} catch (IOException e) { } catch (IOException e) {
error = true;
LOG.warn("Failed to delete " + subDir); LOG.warn("Failed to delete " + subDir);
} }
return; } else {
}
for (Path baseDir : baseDirs) { for (Path baseDir : baseDirs) {
Path del = subDir == null? baseDir : new Path(baseDir, subDir); Path del = subDir == null? baseDir : new Path(baseDir, subDir);
LOG.debug("NM deleting path : " + del); LOG.debug("NM deleting path : " + del);
try { try {
lfs.delete(del, true); lfs.delete(del, true);
} catch (IOException e) { } catch (IOException e) {
error = true;
LOG.warn("Failed to delete " + subDir); LOG.warn("Failed to delete " + subDir);
} }
} }
}
} else { } else {
try { try {
LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]"); LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
exec.deleteAsUser(user, subDir, baseDirs); if (baseDirs == null || baseDirs.size() == 0) {
delService.exec.deleteAsUser(user, subDir, (Path[])null);
} else {
delService.exec.deleteAsUser(user, subDir,
baseDirs.toArray(new Path[0]));
}
} catch (IOException e) { } catch (IOException e) {
error = true;
LOG.warn("Failed to delete as user " + user, e); LOG.warn("Failed to delete as user " + user, e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
error = true;
LOG.warn("Failed to delete as user " + user, e); LOG.warn("Failed to delete as user " + user, e);
} }
} }
if (error) {
setSuccess(!error);
}
fileDeletionTaskFinished();
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
sb.append(" user : ").append(this.user);
sb.append(" subDir : ").append(
subDir == null ? "null" : subDir.toString());
sb.append(" baseDir : ");
if (baseDirs == null || baseDirs.size() == 0) {
sb.append("null");
} else {
for (Path baseDir : baseDirs) {
sb.append(baseDir.toString()).append(',');
}
}
return sb.toString();
}
/**
* If there is a task dependency between say tasks 1,2,3 such that
* task2 and task3 can be started only after task1 then we should define
* task2 and task3 as successor tasks for task1.
* Note:- Task dependency should be defined prior to
* @param successorTask
*/
public synchronized void addFileDeletionTaskDependency(
FileDeletionTask successorTask) {
if (successorTaskSet.add(successorTask)) {
successorTask.incrementAndGetPendingPredecessorTasks();
}
}
/*
* This is called when
* 1) Current file deletion task ran and finished.
* 2) This can be even directly called by predecessor task if one of the
* dependent tasks of it has failed marking its success = false.
*/
private synchronized void fileDeletionTaskFinished() {
Iterator<FileDeletionTask> successorTaskI =
this.successorTaskSet.iterator();
while (successorTaskI.hasNext()) {
FileDeletionTask successorTask = successorTaskI.next();
if (!success) {
successorTask.setSuccess(success);
}
int count = successorTask.decrementAndGetPendingPredecessorTasks();
if (count == 0) {
if (successorTask.getSucess()) {
successorTask.delService.scheduleFileDeletionTask(successorTask);
} else {
successorTask.fileDeletionTaskFinished();
} }
} }
} }
}
}
/**
* Helper method to create file deletion task. To be used only if we need
* a way to define dependencies between deletion tasks.
* @param user user on whose behalf this task is suppose to run
* @param subDir sub directory as required in
* {@link DeletionService#delete(String, Path, Path...)}
* @param baseDirs base directories as required in
* {@link DeletionService#delete(String, Path, Path...)}
*/
public FileDeletionTask createFileDeletionTask(String user, Path subDir,
Path[] baseDirs) {
return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
}
}

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@ -1094,7 +1095,8 @@ public class ResourceLocalizationService extends CompositeService
try { try {
if (status.getPath().getName().matches(".*" + if (status.getPath().getName().matches(".*" +
ContainerLocalizer.USERCACHE + "_DEL_.*")) { ContainerLocalizer.USERCACHE + "_DEL_.*")) {
cleanUpFilesFromSubDir(lfs, del, status.getPath()); LOG.info("usercache path : " + status.getPath().toString());
cleanUpFilesPerUserDir(lfs, del, status.getPath());
} else if (status.getPath().getName() } else if (status.getPath().getName()
.matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
|| ||
@ -1111,17 +1113,28 @@ public class ResourceLocalizationService extends CompositeService
} }
} }
private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del, private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
Path dirPath) throws IOException { Path userDirPath) throws IOException {
RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath); RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
if (fileStatus != null) { FileDeletionTask dependentDeletionTask =
while (fileStatus.hasNext()) { del.createFileDeletionTask(null, userDirPath, new Path[] {});
FileStatus status = fileStatus.next(); if (userDirStatus != null) {
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
while (userDirStatus.hasNext()) {
FileStatus status = userDirStatus.next();
String owner = status.getOwner(); String owner = status.getOwner();
del.delete(owner, status.getPath(), new Path[] {}); FileDeletionTask deletionTask =
del.createFileDeletionTask(owner, null,
new Path[] { status.getPath() });
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
deletionTasks.add(deletionTask);
} }
for (FileDeletionTask task : deletionTasks) {
del.scheduleFileDeletionTask(task);
}
} else {
del.scheduleFileDeletionTask(dependentDeletionTask);
} }
del.delete(null, dirPath, new Path[] {});
} }
} }

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -28,16 +33,11 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import static org.junit.Assert.*;
public class TestDeletionService { public class TestDeletionService {
private static final FileContext lfs = getLfs(); private static final FileContext lfs = getLfs();
@ -210,4 +210,79 @@ public class TestDeletionService {
} }
assertTrue(del.isTerminated()); assertTrue(del.isTerminated());
} }
@Test (timeout=60000)
public void testFileDeletionTaskDependency() throws Exception {
FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
Configuration conf = new Configuration();
exec.setConf(conf);
DeletionService del = new DeletionService(exec);
del.init(conf);
del.start();
try {
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
System.out.println("SEED: " + seed);
List<Path> dirs = buildDirs(r, base, 2);
createDirs(new Path("."), dirs);
// first we will try to delete sub directories which are present. This
// should then trigger parent directory to be deleted.
List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
FileDeletionTask dependentDeletionTask =
del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
for (Path subDir : subDirs) {
FileDeletionTask deletionTask =
del.createFileDeletionTask(null, null, new Path[] { subDir });
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
deletionTasks.add(deletionTask);
}
for (FileDeletionTask task : deletionTasks) {
del.scheduleFileDeletionTask(task);
}
int msecToWait = 20 * 1000;
while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) {
Thread.sleep(100);
msecToWait -= 100;
}
assertFalse(lfs.util().exists(dirs.get(0)));
// Now we will try to delete sub directories; one of the deletion task we
// will mark as failure and then parent directory should not be deleted.
subDirs = buildDirs(r, dirs.get(1), 2);
subDirs.add(new Path(dirs.get(1), "absentFile"));
dependentDeletionTask =
del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
deletionTasks = new ArrayList<FileDeletionTask>();
for (Path subDir : subDirs) {
FileDeletionTask deletionTask =
del.createFileDeletionTask(null, null, new Path[] { subDir });
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
deletionTasks.add(deletionTask);
}
// marking one of the tasks as a failure.
deletionTasks.get(2).setSuccess(false);
for (FileDeletionTask task : deletionTasks) {
del.scheduleFileDeletionTask(task);
}
msecToWait = 20 * 1000;
while (msecToWait > 0
&& (lfs.util().exists(subDirs.get(0)) || lfs.util().exists(
subDirs.get(1)))) {
Thread.sleep(100);
msecToWait -= 100;
}
assertTrue(lfs.util().exists(dirs.get(1)));
} finally {
del.stop();
}
}
} }

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@ -69,8 +70,8 @@ import org.mockito.ArgumentMatcher;
public class TestNodeManagerReboot { public class TestNodeManagerReboot {
static final File basedir = static final File basedir = new File("target",
new File("target", TestNodeManagerReboot.class.getName()); TestNodeManagerReboot.class.getName());
static final File logsDir = new File(basedir, "logs"); static final File logsDir = new File(basedir, "logs");
static final File nmLocalDir = new File(basedir, "nm0"); static final File nmLocalDir = new File(basedir, "nm0");
static final File localResourceDir = new File(basedir, "resource"); static final File localResourceDir = new File(basedir, "resource");
@ -100,7 +101,8 @@ public class TestNodeManagerReboot {
nm = new MyNodeManager(); nm = new MyNodeManager();
nm.start(); nm.start();
final ContainerManagementProtocol containerManager = nm.getContainerManager(); final ContainerManagementProtocol containerManager =
nm.getContainerManager();
// create files under fileCache // create files under fileCache
createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100); createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100);
@ -112,16 +114,13 @@ public class TestNodeManagerReboot {
ContainerId cId = createContainerId(); ContainerId cId = createContainerId();
URL localResourceUri = URL localResourceUri =
ConverterUtils.getYarnUrlFromPath(localFS ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(
.makeQualified(new Path(localResourceDir.getAbsolutePath()))); localResourceDir.getAbsolutePath())));
LocalResource localResource = LocalResource localResource =
Records.newRecord(LocalResource.class); LocalResource.newInstance(localResourceUri, LocalResourceType.FILE,
localResource.setResource(localResourceUri); LocalResourceVisibility.APPLICATION, -1,
localResource.setSize(-1); localResourceDir.lastModified());
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource.setType(LocalResourceType.FILE);
localResource.setTimestamp(localResourceDir.lastModified());
String destinationFile = "dest_file"; String destinationFile = "dest_file";
Map<String, LocalResource> localResources = Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>(); new HashMap<String, LocalResource>();
@ -137,8 +136,9 @@ public class TestNodeManagerReboot {
startRequest.setContainerToken(TestContainerManager.createContainerToken( startRequest.setContainerToken(TestContainerManager.createContainerToken(
cId, 0, nodeId, destinationFile, nm.getNMContext() cId, 0, nodeId, destinationFile, nm.getNMContext()
.getContainerTokenSecretManager())); .getContainerTokenSecretManager()));
final UserGroupInformation currentUser = UserGroupInformation final UserGroupInformation currentUser =
.createRemoteUser(cId.getApplicationAttemptId().toString()); UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId()
.toString());
NMTokenIdentifier nmIdentifier = NMTokenIdentifier nmIdentifier =
new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123); new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123);
currentUser.addTokenIdentifier(nmIdentifier); currentUser.addTokenIdentifier(nmIdentifier);
@ -170,14 +170,17 @@ public class TestNodeManagerReboot {
Assert.assertEquals(ContainerState.DONE, container.getContainerState()); Assert.assertEquals(ContainerState.DONE, container.getContainerState());
Assert.assertTrue( Assert
"The container should create a subDir named currentUser: " + user + .assertTrue(
"under localDir/usercache", "The container should create a subDir named currentUser: " + user
+ "under localDir/usercache",
numOfLocalDirs(nmLocalDir.getAbsolutePath(), numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ContainerLocalizer.USERCACHE) > 0); ContainerLocalizer.USERCACHE) > 0);
Assert.assertTrue("There should be files or Dirs under nm_private when " + Assert.assertTrue(
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(), "There should be files or Dirs under nm_private when "
+ "container is launched",
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) > 0); ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
// restart the NodeManager // restart the NodeManager
@ -186,11 +189,12 @@ public class TestNodeManagerReboot {
nm.start(); nm.start();
numTries = 0; numTries = 0;
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
.USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer.USERCACHE) > 0
ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
> 0) && numTries < MAX_TRIES) { nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
&& numTries < MAX_TRIES) {
try { try {
Thread.sleep(500); Thread.sleep(500);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@ -199,21 +203,27 @@ public class TestNodeManagerReboot {
numTries++; numTries++;
} }
Assert.assertTrue("After NM reboots, all local files should be deleted", Assert
numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer .assertTrue(
.USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(), "After NM reboots, all local files should be deleted",
ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir numOfLocalDirs(nmLocalDir.getAbsolutePath(),
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) ContainerLocalizer.USERCACHE) == 0
== 0); && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ContainerLocalizer.FILECACHE) == 0
&& numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
verify(delService, times(1)).delete( verify(delService, times(1)).delete(
(String) isNull(), (String) isNull(),
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
+ "_DEL_"))); + "_DEL_")));
verify(delService, times(1)).delete((String) isNull(), verify(delService, times(1)).delete((String) isNull(),
argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
verify(delService, times(1)).delete((String) isNull(), verify(delService, times(1)).scheduleFileDeletionTask(
argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_"))); argThat(new FileDeletionInclude(user, null,
new String[] { destinationFile })));
verify(delService, times(1)).scheduleFileDeletionTask(
argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
+ "_DEL_", new String[] {})));
} }
private int numOfLocalDirs(String localDir, String localSubDir) { private int numOfLocalDirs(String localDir, String localSubDir) {
@ -238,7 +248,8 @@ public class TestNodeManagerReboot {
private ContainerId createContainerId() { private ContainerId createContainerId() {
ApplicationId appId = ApplicationId.newInstance(0, 0); ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0); ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
return containerId; return containerId;
} }
@ -253,8 +264,8 @@ public class TestNodeManagerReboot {
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater( MockNodeStatusUpdater myNodeStatusUpdater =
context, dispatcher, healthChecker, metrics); new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics);
return myNodeStatusUpdater; return myNodeStatusUpdater;
} }
@ -288,4 +299,58 @@ public class TestNodeManagerReboot {
return ((Path) o).getName().indexOf(part) != -1; return ((Path) o).getName().indexOf(part) != -1;
} }
} }
class FileDeletionInclude extends ArgumentMatcher<FileDeletionTask> {
final String user;
final String subDirIncludes;
final String[] baseDirIncludes;
public FileDeletionInclude(String user, String subDirIncludes,
String [] baseDirIncludes) {
this.user = user;
this.subDirIncludes = subDirIncludes;
this.baseDirIncludes = baseDirIncludes;
}
@Override
public boolean matches(Object o) {
FileDeletionTask fd = (FileDeletionTask)o;
if (fd.getUser() == null && user != null) {
return false;
} else if (fd.getUser() != null && user == null) {
return false;
} else if (fd.getUser() != null && user != null) {
return fd.getUser().equals(user);
}
if (!comparePaths(fd.getSubDir(), subDirIncludes)) {
return false;
}
if (baseDirIncludes == null && fd.getBaseDirs() != null) {
return false;
} else if (baseDirIncludes != null && fd.getBaseDirs() == null ) {
return false;
} else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
if (baseDirIncludes.length != fd.getBaseDirs().size()) {
return false;
}
for (int i =0 ; i < baseDirIncludes.length; i++) {
if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) {
return false;
}
}
}
return true;
}
public boolean comparePaths(Path p1, String p2) {
if (p1 == null && p2 != null){
return false;
} else if (p1 != null && p2 == null) {
return false;
} else if (p1 != null && p2 != null ){
return p1.toUri().getPath().contains(p2.toString());
}
return true;
}
}
} }