diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4d6cb00b23e..695d008a6f1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -45,6 +45,9 @@ Release 2.1.1-beta - UNRELEASED YARN-523. Modified a test-case to validate container diagnostics on localization failures. (Jian He via vinodkv) + YARN-661. Fixed NM to cleanup users' local directories correctly when + starting up. (Omkar Vinit Joshi via vinodkv) + Release 2.1.0-beta - 2013-07-02 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 1ab5a37e802..45504fdcdd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -18,23 +18,30 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import static java.util.concurrent.TimeUnit.*; +import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.fs.Path; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; public class DeletionService extends AbstractService { @@ -42,7 +49,8 @@ public class DeletionService extends AbstractService { private int debugDelay; private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; - private final FileContext lfs = getLfs(); + private static final FileContext lfs = getLfs(); + static final FileContext getLfs() { try { return FileContext.getLocalFSFileContext(); @@ -68,11 +76,23 @@ public class DeletionService extends AbstractService { public void delete(String user, Path subDir, Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline if (debugDelay != -1) { - sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay, - TimeUnit.SECONDS); + if (baseDirs == null || baseDirs.length == 0) { + sched.schedule(new FileDeletionTask(this, user, subDir, null), + debugDelay, TimeUnit.SECONDS); + } else { + sched.schedule( + new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)), + debugDelay, TimeUnit.SECONDS); + } } } - + + public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { + if (debugDelay != -1) { + sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); + } + } + @Override protected void serviceInit(Configuration conf) throws Exception { ThreadFactory tf = new ThreadFactoryBuilder() @@ -118,46 +138,184 @@ public class DeletionService extends AbstractService { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } - private class FileDeletion implements Runnable { - final String user; - final Path subDir; - final Path[] baseDirs; - FileDeletion(String user, Path subDir, Path[] baseDirs) { + public static class FileDeletionTask implements Runnable { + private final String user; + private final Path subDir; + private final List baseDirs; + private final AtomicInteger numberOfPendingPredecessorTasks; + private final Set successorTaskSet; + private final DeletionService delService; + // By default all tasks will start as success=true; however if any of + // the dependent task fails then it will be marked as false in + // fileDeletionTaskFinished(). + private boolean success; + + private FileDeletionTask(DeletionService delService, String user, + Path subDir, List baseDirs) { + this.delService = delService; this.user = user; this.subDir = subDir; this.baseDirs = baseDirs; + this.successorTaskSet = new HashSet(); + this.numberOfPendingPredecessorTasks = new AtomicInteger(0); + success = true; } + + /** + * increments and returns pending predecessor task count + */ + public int incrementAndGetPendingPredecessorTasks() { + return numberOfPendingPredecessorTasks.incrementAndGet(); + } + + /** + * decrements and returns pending predecessor task count + */ + public int decrementAndGetPendingPredecessorTasks() { + return numberOfPendingPredecessorTasks.decrementAndGet(); + } + + @VisibleForTesting + public String getUser() { + return this.user; + } + + @VisibleForTesting + public Path getSubDir() { + return this.subDir; + } + + @VisibleForTesting + public List getBaseDirs() { + return this.baseDirs; + } + + public synchronized void setSuccess(boolean success) { + this.success = success; + } + + public synchronized boolean getSucess() { + return this.success; + } + @Override public void run() { + if (LOG.isDebugEnabled()) { + LOG.debug(this); + } + boolean error = false; if (null == user) { - if (baseDirs == null || baseDirs.length == 0) { + if (baseDirs == null || baseDirs.size() == 0) { LOG.debug("NM deleting absolute path : " + subDir); try { lfs.delete(subDir, true); } catch (IOException e) { + error = true; LOG.warn("Failed to delete " + subDir); } - return; - } - for (Path baseDir : baseDirs) { - Path del = subDir == null? baseDir : new Path(baseDir, subDir); - LOG.debug("NM deleting path : " + del); - try { - lfs.delete(del, true); - } catch (IOException e) { - LOG.warn("Failed to delete " + subDir); + } else { + for (Path baseDir : baseDirs) { + Path del = subDir == null? baseDir : new Path(baseDir, subDir); + LOG.debug("NM deleting path : " + del); + try { + lfs.delete(del, true); + } catch (IOException e) { + error = true; + LOG.warn("Failed to delete " + subDir); + } } } } else { try { LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]"); - exec.deleteAsUser(user, subDir, baseDirs); + if (baseDirs == null || baseDirs.size() == 0) { + delService.exec.deleteAsUser(user, subDir, (Path[])null); + } else { + delService.exec.deleteAsUser(user, subDir, + baseDirs.toArray(new Path[0])); + } } catch (IOException e) { + error = true; LOG.warn("Failed to delete as user " + user, e); } catch (InterruptedException e) { + error = true; LOG.warn("Failed to delete as user " + user, e); } } + if (error) { + setSuccess(!error); + } + fileDeletionTaskFinished(); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer("\nFileDeletionTask : "); + sb.append(" user : ").append(this.user); + sb.append(" subDir : ").append( + subDir == null ? "null" : subDir.toString()); + sb.append(" baseDir : "); + if (baseDirs == null || baseDirs.size() == 0) { + sb.append("null"); + } else { + for (Path baseDir : baseDirs) { + sb.append(baseDir.toString()).append(','); + } + } + return sb.toString(); + } + + /** + * If there is a task dependency between say tasks 1,2,3 such that + * task2 and task3 can be started only after task1 then we should define + * task2 and task3 as successor tasks for task1. + * Note:- Task dependency should be defined prior to + * @param successorTask + */ + public synchronized void addFileDeletionTaskDependency( + FileDeletionTask successorTask) { + if (successorTaskSet.add(successorTask)) { + successorTask.incrementAndGetPendingPredecessorTasks(); + } + } + + /* + * This is called when + * 1) Current file deletion task ran and finished. + * 2) This can be even directly called by predecessor task if one of the + * dependent tasks of it has failed marking its success = false. + */ + private synchronized void fileDeletionTaskFinished() { + Iterator successorTaskI = + this.successorTaskSet.iterator(); + while (successorTaskI.hasNext()) { + FileDeletionTask successorTask = successorTaskI.next(); + if (!success) { + successorTask.setSuccess(success); + } + int count = successorTask.decrementAndGetPendingPredecessorTasks(); + if (count == 0) { + if (successorTask.getSucess()) { + successorTask.delService.scheduleFileDeletionTask(successorTask); + } else { + successorTask.fileDeletionTaskFinished(); + } + } + } } } -} + + /** + * Helper method to create file deletion task. To be used only if we need + * a way to define dependencies between deletion tasks. + * @param user user on whose behalf this task is suppose to run + * @param subDir sub directory as required in + * {@link DeletionService#delete(String, Path, Path...)} + * @param baseDirs base directories as required in + * {@link DeletionService#delete(String, Path, Path...)} + */ + public FileDeletionTask createFileDeletionTask(String user, Path subDir, + Path[] baseDirs) { + return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 3324ddcb75b..1fdb0821708 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; @@ -1094,7 +1095,8 @@ public class ResourceLocalizationService extends CompositeService try { if (status.getPath().getName().matches(".*" + ContainerLocalizer.USERCACHE + "_DEL_.*")) { - cleanUpFilesFromSubDir(lfs, del, status.getPath()); + LOG.info("usercache path : " + status.getPath().toString()); + cleanUpFilesPerUserDir(lfs, del, status.getPath()); } else if (status.getPath().getName() .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") || @@ -1111,17 +1113,28 @@ public class ResourceLocalizationService extends CompositeService } } - private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del, - Path dirPath) throws IOException { - RemoteIterator fileStatus = lfs.listStatus(dirPath); - if (fileStatus != null) { - while (fileStatus.hasNext()) { - FileStatus status = fileStatus.next(); + private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del, + Path userDirPath) throws IOException { + RemoteIterator userDirStatus = lfs.listStatus(userDirPath); + FileDeletionTask dependentDeletionTask = + del.createFileDeletionTask(null, userDirPath, new Path[] {}); + if (userDirStatus != null) { + List deletionTasks = new ArrayList(); + while (userDirStatus.hasNext()) { + FileStatus status = userDirStatus.next(); String owner = status.getOwner(); - del.delete(owner, status.getPath(), new Path[] {}); + FileDeletionTask deletionTask = + del.createFileDeletionTask(owner, null, + new Path[] { status.getPath() }); + deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + deletionTasks.add(deletionTask); } + for (FileDeletionTask task : deletionTasks) { + del.scheduleFileDeletionTask(task); + } + } else { + del.scheduleFileDeletionTask(dependentDeletionTask); } - del.delete(null, dirPath, new Path[] {}); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index a17e4e70efb..69208c506ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -28,16 +33,11 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService; - - +import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.junit.AfterClass; import org.junit.Test; import org.mockito.Mockito; -import static org.junit.Assert.*; - public class TestDeletionService { private static final FileContext lfs = getLfs(); @@ -210,4 +210,79 @@ public class TestDeletionService { } assertTrue(del.isTerminated()); } + + @Test (timeout=60000) + public void testFileDeletionTaskDependency() throws Exception { + FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor(); + Configuration conf = new Configuration(); + exec.setConf(conf); + DeletionService del = new DeletionService(exec); + del.init(conf); + del.start(); + + try { + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("SEED: " + seed); + List dirs = buildDirs(r, base, 2); + createDirs(new Path("."), dirs); + + // first we will try to delete sub directories which are present. This + // should then trigger parent directory to be deleted. + List subDirs = buildDirs(r, dirs.get(0), 2); + + FileDeletionTask dependentDeletionTask = + del.createFileDeletionTask(null, dirs.get(0), new Path[] {}); + List deletionTasks = new ArrayList(); + for (Path subDir : subDirs) { + FileDeletionTask deletionTask = + del.createFileDeletionTask(null, null, new Path[] { subDir }); + deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + deletionTasks.add(deletionTask); + } + for (FileDeletionTask task : deletionTasks) { + del.scheduleFileDeletionTask(task); + } + + int msecToWait = 20 * 1000; + while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) { + Thread.sleep(100); + msecToWait -= 100; + } + assertFalse(lfs.util().exists(dirs.get(0))); + + + // Now we will try to delete sub directories; one of the deletion task we + // will mark as failure and then parent directory should not be deleted. + subDirs = buildDirs(r, dirs.get(1), 2); + subDirs.add(new Path(dirs.get(1), "absentFile")); + + dependentDeletionTask = + del.createFileDeletionTask(null, dirs.get(1), new Path[] {}); + deletionTasks = new ArrayList(); + for (Path subDir : subDirs) { + FileDeletionTask deletionTask = + del.createFileDeletionTask(null, null, new Path[] { subDir }); + deletionTask.addFileDeletionTaskDependency(dependentDeletionTask); + deletionTasks.add(deletionTask); + } + // marking one of the tasks as a failure. + deletionTasks.get(2).setSuccess(false); + for (FileDeletionTask task : deletionTasks) { + del.scheduleFileDeletionTask(task); + } + + msecToWait = 20 * 1000; + while (msecToWait > 0 + && (lfs.util().exists(subDirs.get(0)) || lfs.util().exists( + subDirs.get(1)))) { + Thread.sleep(100); + msecToWait -= 100; + } + assertTrue(lfs.util().exists(dirs.get(1))); + } finally { + del.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index 3b88b03c7e9..e07a28d0a9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -69,8 +70,8 @@ import org.mockito.ArgumentMatcher; public class TestNodeManagerReboot { - static final File basedir = - new File("target", TestNodeManagerReboot.class.getName()); + static final File basedir = new File("target", + TestNodeManagerReboot.class.getName()); static final File logsDir = new File(basedir, "logs"); static final File nmLocalDir = new File(basedir, "nm0"); static final File localResourceDir = new File(basedir, "resource"); @@ -100,7 +101,8 @@ public class TestNodeManagerReboot { nm = new MyNodeManager(); nm.start(); - final ContainerManagementProtocol containerManager = nm.getContainerManager(); + final ContainerManagementProtocol containerManager = + nm.getContainerManager(); // create files under fileCache createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100); @@ -112,16 +114,13 @@ public class TestNodeManagerReboot { ContainerId cId = createContainerId(); URL localResourceUri = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(localResourceDir.getAbsolutePath()))); + ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path( + localResourceDir.getAbsolutePath()))); LocalResource localResource = - Records.newRecord(LocalResource.class); - localResource.setResource(localResourceUri); - localResource.setSize(-1); - localResource.setVisibility(LocalResourceVisibility.APPLICATION); - localResource.setType(LocalResourceType.FILE); - localResource.setTimestamp(localResourceDir.lastModified()); + LocalResource.newInstance(localResourceUri, LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION, -1, + localResourceDir.lastModified()); String destinationFile = "dest_file"; Map localResources = new HashMap(); @@ -129,7 +128,7 @@ public class TestNodeManagerReboot { containerLaunchContext.setLocalResources(localResources); List commands = new ArrayList(); containerLaunchContext.setCommands(commands); - + final StartContainerRequest startRequest = Records.newRecord(StartContainerRequest.class); startRequest.setContainerLaunchContext(containerLaunchContext); @@ -137,8 +136,9 @@ public class TestNodeManagerReboot { startRequest.setContainerToken(TestContainerManager.createContainerToken( cId, 0, nodeId, destinationFile, nm.getNMContext() .getContainerTokenSecretManager())); - final UserGroupInformation currentUser = UserGroupInformation - .createRemoteUser(cId.getApplicationAttemptId().toString()); + final UserGroupInformation currentUser = + UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId() + .toString()); NMTokenIdentifier nmIdentifier = new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123); currentUser.addTokenIdentifier(nmIdentifier); @@ -170,27 +170,31 @@ public class TestNodeManagerReboot { Assert.assertEquals(ContainerState.DONE, container.getContainerState()); - Assert.assertTrue( - "The container should create a subDir named currentUser: " + user + - "under localDir/usercache", + Assert + .assertTrue( + "The container should create a subDir named currentUser: " + user + + "under localDir/usercache", numOfLocalDirs(nmLocalDir.getAbsolutePath(), - ContainerLocalizer.USERCACHE) > 0); + ContainerLocalizer.USERCACHE) > 0); - Assert.assertTrue("There should be files or Dirs under nm_private when " + - "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(), + Assert.assertTrue( + "There should be files or Dirs under nm_private when " + + "container is launched", + numOfLocalDirs(nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0); // restart the NodeManager nm.stop(); nm = new MyNodeManager(); - nm.start(); + nm.start(); numTries = 0; - while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer - .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(), - ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir - .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) - > 0) && numTries < MAX_TRIES) { + while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.USERCACHE) > 0 + || numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs( + nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0) + && numTries < MAX_TRIES) { try { Thread.sleep(500); } catch (InterruptedException ex) { @@ -199,21 +203,27 @@ public class TestNodeManagerReboot { numTries++; } - Assert.assertTrue("After NM reboots, all local files should be deleted", - numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer - .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(), - ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir - .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) - == 0); + Assert + .assertTrue( + "After NM reboots, all local files should be deleted", + numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.USERCACHE) == 0 + && numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.FILECACHE) == 0 + && numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ResourceLocalizationService.NM_PRIVATE_DIR) == 0); verify(delService, times(1)).delete( - (String) isNull(), - argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR - + "_DEL_"))); + (String) isNull(), + argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR + + "_DEL_"))); verify(delService, times(1)).delete((String) isNull(), - argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); - verify(delService, times(1)).delete((String) isNull(), - argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_"))); - + argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); + verify(delService, times(1)).scheduleFileDeletionTask( + argThat(new FileDeletionInclude(user, null, + new String[] { destinationFile }))); + verify(delService, times(1)).scheduleFileDeletionTask( + argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE + + "_DEL_", new String[] {}))); } private int numOfLocalDirs(String localDir, String localSubDir) { @@ -238,7 +248,8 @@ public class TestNodeManagerReboot { private ContainerId createContainerId() { ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newInstance(appAttemptId, 0); return containerId; } @@ -253,8 +264,8 @@ public class TestNodeManagerReboot { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater( - context, dispatcher, healthChecker, metrics); + MockNodeStatusUpdater myNodeStatusUpdater = + new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics); return myNodeStatusUpdater; } @@ -288,4 +299,58 @@ public class TestNodeManagerReboot { return ((Path) o).getName().indexOf(part) != -1; } } + + class FileDeletionInclude extends ArgumentMatcher { + final String user; + final String subDirIncludes; + final String[] baseDirIncludes; + + public FileDeletionInclude(String user, String subDirIncludes, + String [] baseDirIncludes) { + this.user = user; + this.subDirIncludes = subDirIncludes; + this.baseDirIncludes = baseDirIncludes; + } + + @Override + public boolean matches(Object o) { + FileDeletionTask fd = (FileDeletionTask)o; + if (fd.getUser() == null && user != null) { + return false; + } else if (fd.getUser() != null && user == null) { + return false; + } else if (fd.getUser() != null && user != null) { + return fd.getUser().equals(user); + } + if (!comparePaths(fd.getSubDir(), subDirIncludes)) { + return false; + } + if (baseDirIncludes == null && fd.getBaseDirs() != null) { + return false; + } else if (baseDirIncludes != null && fd.getBaseDirs() == null ) { + return false; + } else if (baseDirIncludes != null && fd.getBaseDirs() != null) { + if (baseDirIncludes.length != fd.getBaseDirs().size()) { + return false; + } + for (int i =0 ; i < baseDirIncludes.length; i++) { + if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) { + return false; + } + } + } + return true; + } + + public boolean comparePaths(Path p1, String p2) { + if (p1 == null && p2 != null){ + return false; + } else if (p1 != null && p2 == null) { + return false; + } else if (p1 != null && p2 != null ){ + return p1.toUri().getPath().contains(p2.toString()); + } + return true; + } + } }