YARN-661. Fixed NM to cleanup users' local directories correctly when starting up. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503942 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bf876f70fb
commit
af0d2fc3e3
|
@ -62,6 +62,9 @@ Release 2.1.1-beta - UNRELEASED
|
|||
YARN-523. Modified a test-case to validate container diagnostics on
|
||||
localization failures. (Jian He via vinodkv)
|
||||
|
||||
YARN-661. Fixed NM to cleanup users' local directories correctly when
|
||||
starting up. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
Release 2.1.0-beta - 2013-07-02
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -18,23 +18,30 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import static java.util.concurrent.TimeUnit.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
public class DeletionService extends AbstractService {
|
||||
|
@ -42,7 +49,8 @@ public class DeletionService extends AbstractService {
|
|||
private int debugDelay;
|
||||
private final ContainerExecutor exec;
|
||||
private ScheduledThreadPoolExecutor sched;
|
||||
private final FileContext lfs = getLfs();
|
||||
private static final FileContext lfs = getLfs();
|
||||
|
||||
static final FileContext getLfs() {
|
||||
try {
|
||||
return FileContext.getLocalFSFileContext();
|
||||
|
@ -68,11 +76,23 @@ public class DeletionService extends AbstractService {
|
|||
public void delete(String user, Path subDir, Path... baseDirs) {
|
||||
// TODO if parent owned by NM, rename within parent inline
|
||||
if (debugDelay != -1) {
|
||||
sched.schedule(new FileDeletion(user, subDir, baseDirs), debugDelay,
|
||||
TimeUnit.SECONDS);
|
||||
if (baseDirs == null || baseDirs.length == 0) {
|
||||
sched.schedule(new FileDeletionTask(this, user, subDir, null),
|
||||
debugDelay, TimeUnit.SECONDS);
|
||||
} else {
|
||||
sched.schedule(
|
||||
new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
|
||||
debugDelay, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
|
||||
if (debugDelay != -1) {
|
||||
sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
ThreadFactory tf = new ThreadFactoryBuilder()
|
||||
|
@ -118,46 +138,184 @@ public class DeletionService extends AbstractService {
|
|||
return getServiceState() == STATE.STOPPED && sched.isTerminated();
|
||||
}
|
||||
|
||||
private class FileDeletion implements Runnable {
|
||||
final String user;
|
||||
final Path subDir;
|
||||
final Path[] baseDirs;
|
||||
FileDeletion(String user, Path subDir, Path[] baseDirs) {
|
||||
public static class FileDeletionTask implements Runnable {
|
||||
private final String user;
|
||||
private final Path subDir;
|
||||
private final List<Path> baseDirs;
|
||||
private final AtomicInteger numberOfPendingPredecessorTasks;
|
||||
private final Set<FileDeletionTask> successorTaskSet;
|
||||
private final DeletionService delService;
|
||||
// By default all tasks will start as success=true; however if any of
|
||||
// the dependent task fails then it will be marked as false in
|
||||
// fileDeletionTaskFinished().
|
||||
private boolean success;
|
||||
|
||||
private FileDeletionTask(DeletionService delService, String user,
|
||||
Path subDir, List<Path> baseDirs) {
|
||||
this.delService = delService;
|
||||
this.user = user;
|
||||
this.subDir = subDir;
|
||||
this.baseDirs = baseDirs;
|
||||
this.successorTaskSet = new HashSet<FileDeletionTask>();
|
||||
this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
|
||||
success = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* increments and returns pending predecessor task count
|
||||
*/
|
||||
public int incrementAndGetPendingPredecessorTasks() {
|
||||
return numberOfPendingPredecessorTasks.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* decrements and returns pending predecessor task count
|
||||
*/
|
||||
public int decrementAndGetPendingPredecessorTasks() {
|
||||
return numberOfPendingPredecessorTasks.decrementAndGet();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getUser() {
|
||||
return this.user;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Path getSubDir() {
|
||||
return this.subDir;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public List<Path> getBaseDirs() {
|
||||
return this.baseDirs;
|
||||
}
|
||||
|
||||
public synchronized void setSuccess(boolean success) {
|
||||
this.success = success;
|
||||
}
|
||||
|
||||
public synchronized boolean getSucess() {
|
||||
return this.success;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(this);
|
||||
}
|
||||
boolean error = false;
|
||||
if (null == user) {
|
||||
if (baseDirs == null || baseDirs.length == 0) {
|
||||
if (baseDirs == null || baseDirs.size() == 0) {
|
||||
LOG.debug("NM deleting absolute path : " + subDir);
|
||||
try {
|
||||
lfs.delete(subDir, true);
|
||||
} catch (IOException e) {
|
||||
error = true;
|
||||
LOG.warn("Failed to delete " + subDir);
|
||||
}
|
||||
return;
|
||||
}
|
||||
for (Path baseDir : baseDirs) {
|
||||
Path del = subDir == null? baseDir : new Path(baseDir, subDir);
|
||||
LOG.debug("NM deleting path : " + del);
|
||||
try {
|
||||
lfs.delete(del, true);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to delete " + subDir);
|
||||
} else {
|
||||
for (Path baseDir : baseDirs) {
|
||||
Path del = subDir == null? baseDir : new Path(baseDir, subDir);
|
||||
LOG.debug("NM deleting path : " + del);
|
||||
try {
|
||||
lfs.delete(del, true);
|
||||
} catch (IOException e) {
|
||||
error = true;
|
||||
LOG.warn("Failed to delete " + subDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
|
||||
exec.deleteAsUser(user, subDir, baseDirs);
|
||||
if (baseDirs == null || baseDirs.size() == 0) {
|
||||
delService.exec.deleteAsUser(user, subDir, (Path[])null);
|
||||
} else {
|
||||
delService.exec.deleteAsUser(user, subDir,
|
||||
baseDirs.toArray(new Path[0]));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
error = true;
|
||||
LOG.warn("Failed to delete as user " + user, e);
|
||||
} catch (InterruptedException e) {
|
||||
error = true;
|
||||
LOG.warn("Failed to delete as user " + user, e);
|
||||
}
|
||||
}
|
||||
if (error) {
|
||||
setSuccess(!error);
|
||||
}
|
||||
fileDeletionTaskFinished();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
|
||||
sb.append(" user : ").append(this.user);
|
||||
sb.append(" subDir : ").append(
|
||||
subDir == null ? "null" : subDir.toString());
|
||||
sb.append(" baseDir : ");
|
||||
if (baseDirs == null || baseDirs.size() == 0) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
for (Path baseDir : baseDirs) {
|
||||
sb.append(baseDir.toString()).append(',');
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* If there is a task dependency between say tasks 1,2,3 such that
|
||||
* task2 and task3 can be started only after task1 then we should define
|
||||
* task2 and task3 as successor tasks for task1.
|
||||
* Note:- Task dependency should be defined prior to
|
||||
* @param successorTask
|
||||
*/
|
||||
public synchronized void addFileDeletionTaskDependency(
|
||||
FileDeletionTask successorTask) {
|
||||
if (successorTaskSet.add(successorTask)) {
|
||||
successorTask.incrementAndGetPendingPredecessorTasks();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This is called when
|
||||
* 1) Current file deletion task ran and finished.
|
||||
* 2) This can be even directly called by predecessor task if one of the
|
||||
* dependent tasks of it has failed marking its success = false.
|
||||
*/
|
||||
private synchronized void fileDeletionTaskFinished() {
|
||||
Iterator<FileDeletionTask> successorTaskI =
|
||||
this.successorTaskSet.iterator();
|
||||
while (successorTaskI.hasNext()) {
|
||||
FileDeletionTask successorTask = successorTaskI.next();
|
||||
if (!success) {
|
||||
successorTask.setSuccess(success);
|
||||
}
|
||||
int count = successorTask.decrementAndGetPendingPredecessorTasks();
|
||||
if (count == 0) {
|
||||
if (successorTask.getSucess()) {
|
||||
successorTask.delService.scheduleFileDeletionTask(successorTask);
|
||||
} else {
|
||||
successorTask.fileDeletionTaskFinished();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create file deletion task. To be used only if we need
|
||||
* a way to define dependencies between deletion tasks.
|
||||
* @param user user on whose behalf this task is suppose to run
|
||||
* @param subDir sub directory as required in
|
||||
* {@link DeletionService#delete(String, Path, Path...)}
|
||||
* @param baseDirs base directories as required in
|
||||
* {@link DeletionService#delete(String, Path, Path...)}
|
||||
*/
|
||||
public FileDeletionTask createFileDeletionTask(String user, Path subDir,
|
||||
Path[] baseDirs) {
|
||||
return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
|
||||
}
|
||||
}
|
|
@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
|
||||
|
@ -1094,7 +1095,8 @@ public class ResourceLocalizationService extends CompositeService
|
|||
try {
|
||||
if (status.getPath().getName().matches(".*" +
|
||||
ContainerLocalizer.USERCACHE + "_DEL_.*")) {
|
||||
cleanUpFilesFromSubDir(lfs, del, status.getPath());
|
||||
LOG.info("usercache path : " + status.getPath().toString());
|
||||
cleanUpFilesPerUserDir(lfs, del, status.getPath());
|
||||
} else if (status.getPath().getName()
|
||||
.matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
|
||||
||
|
||||
|
@ -1111,17 +1113,28 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
}
|
||||
|
||||
private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
|
||||
Path dirPath) throws IOException {
|
||||
RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
|
||||
if (fileStatus != null) {
|
||||
while (fileStatus.hasNext()) {
|
||||
FileStatus status = fileStatus.next();
|
||||
private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
|
||||
Path userDirPath) throws IOException {
|
||||
RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
|
||||
FileDeletionTask dependentDeletionTask =
|
||||
del.createFileDeletionTask(null, userDirPath, new Path[] {});
|
||||
if (userDirStatus != null) {
|
||||
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
||||
while (userDirStatus.hasNext()) {
|
||||
FileStatus status = userDirStatus.next();
|
||||
String owner = status.getOwner();
|
||||
del.delete(owner, status.getPath(), new Path[] {});
|
||||
FileDeletionTask deletionTask =
|
||||
del.createFileDeletionTask(owner, null,
|
||||
new Path[] { status.getPath() });
|
||||
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
|
||||
deletionTasks.add(deletionTask);
|
||||
}
|
||||
for (FileDeletionTask task : deletionTasks) {
|
||||
del.scheduleFileDeletionTask(task);
|
||||
}
|
||||
} else {
|
||||
del.scheduleFileDeletionTask(dependentDeletionTask);
|
||||
}
|
||||
del.delete(null, dirPath, new Path[] {});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,11 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -28,16 +33,11 @@ import org.apache.hadoop.fs.FileContext;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestDeletionService {
|
||||
|
||||
private static final FileContext lfs = getLfs();
|
||||
|
@ -210,4 +210,79 @@ public class TestDeletionService {
|
|||
}
|
||||
assertTrue(del.isTerminated());
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testFileDeletionTaskDependency() throws Exception {
|
||||
FakeDefaultContainerExecutor exec = new FakeDefaultContainerExecutor();
|
||||
Configuration conf = new Configuration();
|
||||
exec.setConf(conf);
|
||||
DeletionService del = new DeletionService(exec);
|
||||
del.init(conf);
|
||||
del.start();
|
||||
|
||||
try {
|
||||
Random r = new Random();
|
||||
long seed = r.nextLong();
|
||||
r.setSeed(seed);
|
||||
System.out.println("SEED: " + seed);
|
||||
List<Path> dirs = buildDirs(r, base, 2);
|
||||
createDirs(new Path("."), dirs);
|
||||
|
||||
// first we will try to delete sub directories which are present. This
|
||||
// should then trigger parent directory to be deleted.
|
||||
List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
|
||||
|
||||
FileDeletionTask dependentDeletionTask =
|
||||
del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
|
||||
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
||||
for (Path subDir : subDirs) {
|
||||
FileDeletionTask deletionTask =
|
||||
del.createFileDeletionTask(null, null, new Path[] { subDir });
|
||||
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
|
||||
deletionTasks.add(deletionTask);
|
||||
}
|
||||
for (FileDeletionTask task : deletionTasks) {
|
||||
del.scheduleFileDeletionTask(task);
|
||||
}
|
||||
|
||||
int msecToWait = 20 * 1000;
|
||||
while (msecToWait > 0 && (lfs.util().exists(dirs.get(0)))) {
|
||||
Thread.sleep(100);
|
||||
msecToWait -= 100;
|
||||
}
|
||||
assertFalse(lfs.util().exists(dirs.get(0)));
|
||||
|
||||
|
||||
// Now we will try to delete sub directories; one of the deletion task we
|
||||
// will mark as failure and then parent directory should not be deleted.
|
||||
subDirs = buildDirs(r, dirs.get(1), 2);
|
||||
subDirs.add(new Path(dirs.get(1), "absentFile"));
|
||||
|
||||
dependentDeletionTask =
|
||||
del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
|
||||
deletionTasks = new ArrayList<FileDeletionTask>();
|
||||
for (Path subDir : subDirs) {
|
||||
FileDeletionTask deletionTask =
|
||||
del.createFileDeletionTask(null, null, new Path[] { subDir });
|
||||
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
|
||||
deletionTasks.add(deletionTask);
|
||||
}
|
||||
// marking one of the tasks as a failure.
|
||||
deletionTasks.get(2).setSuccess(false);
|
||||
for (FileDeletionTask task : deletionTasks) {
|
||||
del.scheduleFileDeletionTask(task);
|
||||
}
|
||||
|
||||
msecToWait = 20 * 1000;
|
||||
while (msecToWait > 0
|
||||
&& (lfs.util().exists(subDirs.get(0)) || lfs.util().exists(
|
||||
subDirs.get(1)))) {
|
||||
Thread.sleep(100);
|
||||
msecToWait -= 100;
|
||||
}
|
||||
assertTrue(lfs.util().exists(dirs.get(1)));
|
||||
} finally {
|
||||
del.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
|
@ -69,8 +70,8 @@ import org.mockito.ArgumentMatcher;
|
|||
|
||||
public class TestNodeManagerReboot {
|
||||
|
||||
static final File basedir =
|
||||
new File("target", TestNodeManagerReboot.class.getName());
|
||||
static final File basedir = new File("target",
|
||||
TestNodeManagerReboot.class.getName());
|
||||
static final File logsDir = new File(basedir, "logs");
|
||||
static final File nmLocalDir = new File(basedir, "nm0");
|
||||
static final File localResourceDir = new File(basedir, "resource");
|
||||
|
@ -100,7 +101,8 @@ public class TestNodeManagerReboot {
|
|||
nm = new MyNodeManager();
|
||||
nm.start();
|
||||
|
||||
final ContainerManagementProtocol containerManager = nm.getContainerManager();
|
||||
final ContainerManagementProtocol containerManager =
|
||||
nm.getContainerManager();
|
||||
|
||||
// create files under fileCache
|
||||
createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100);
|
||||
|
@ -112,16 +114,13 @@ public class TestNodeManagerReboot {
|
|||
ContainerId cId = createContainerId();
|
||||
|
||||
URL localResourceUri =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
.makeQualified(new Path(localResourceDir.getAbsolutePath())));
|
||||
ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(
|
||||
localResourceDir.getAbsolutePath())));
|
||||
|
||||
LocalResource localResource =
|
||||
Records.newRecord(LocalResource.class);
|
||||
localResource.setResource(localResourceUri);
|
||||
localResource.setSize(-1);
|
||||
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
localResource.setType(LocalResourceType.FILE);
|
||||
localResource.setTimestamp(localResourceDir.lastModified());
|
||||
LocalResource.newInstance(localResourceUri, LocalResourceType.FILE,
|
||||
LocalResourceVisibility.APPLICATION, -1,
|
||||
localResourceDir.lastModified());
|
||||
String destinationFile = "dest_file";
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
|
@ -129,7 +128,7 @@ public class TestNodeManagerReboot {
|
|||
containerLaunchContext.setLocalResources(localResources);
|
||||
List<String> commands = new ArrayList<String>();
|
||||
containerLaunchContext.setCommands(commands);
|
||||
|
||||
|
||||
final StartContainerRequest startRequest =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
@ -137,8 +136,9 @@ public class TestNodeManagerReboot {
|
|||
startRequest.setContainerToken(TestContainerManager.createContainerToken(
|
||||
cId, 0, nodeId, destinationFile, nm.getNMContext()
|
||||
.getContainerTokenSecretManager()));
|
||||
final UserGroupInformation currentUser = UserGroupInformation
|
||||
.createRemoteUser(cId.getApplicationAttemptId().toString());
|
||||
final UserGroupInformation currentUser =
|
||||
UserGroupInformation.createRemoteUser(cId.getApplicationAttemptId()
|
||||
.toString());
|
||||
NMTokenIdentifier nmIdentifier =
|
||||
new NMTokenIdentifier(cId.getApplicationAttemptId(), nodeId, user, 123);
|
||||
currentUser.addTokenIdentifier(nmIdentifier);
|
||||
|
@ -170,27 +170,31 @@ public class TestNodeManagerReboot {
|
|||
|
||||
Assert.assertEquals(ContainerState.DONE, container.getContainerState());
|
||||
|
||||
Assert.assertTrue(
|
||||
"The container should create a subDir named currentUser: " + user +
|
||||
"under localDir/usercache",
|
||||
Assert
|
||||
.assertTrue(
|
||||
"The container should create a subDir named currentUser: " + user
|
||||
+ "under localDir/usercache",
|
||||
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.USERCACHE) > 0);
|
||||
ContainerLocalizer.USERCACHE) > 0);
|
||||
|
||||
Assert.assertTrue("There should be files or Dirs under nm_private when " +
|
||||
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
Assert.assertTrue(
|
||||
"There should be files or Dirs under nm_private when "
|
||||
+ "container is launched",
|
||||
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
|
||||
|
||||
// restart the NodeManager
|
||||
nm.stop();
|
||||
nm = new MyNodeManager();
|
||||
nm.start();
|
||||
nm.start();
|
||||
|
||||
numTries = 0;
|
||||
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
|
||||
.USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
|
||||
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
|
||||
> 0) && numTries < MAX_TRIES) {
|
||||
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.USERCACHE) > 0
|
||||
|| numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
|
||||
nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
|
||||
&& numTries < MAX_TRIES) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException ex) {
|
||||
|
@ -199,21 +203,27 @@ public class TestNodeManagerReboot {
|
|||
numTries++;
|
||||
}
|
||||
|
||||
Assert.assertTrue("After NM reboots, all local files should be deleted",
|
||||
numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
|
||||
.USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
|
||||
.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
|
||||
== 0);
|
||||
Assert
|
||||
.assertTrue(
|
||||
"After NM reboots, all local files should be deleted",
|
||||
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.USERCACHE) == 0
|
||||
&& numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ContainerLocalizer.FILECACHE) == 0
|
||||
&& numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||
ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
|
||||
verify(delService, times(1)).delete(
|
||||
(String) isNull(),
|
||||
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
|
||||
+ "_DEL_")));
|
||||
(String) isNull(),
|
||||
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
|
||||
+ "_DEL_")));
|
||||
verify(delService, times(1)).delete((String) isNull(),
|
||||
argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
|
||||
verify(delService, times(1)).delete((String) isNull(),
|
||||
argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
|
||||
|
||||
argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
|
||||
verify(delService, times(1)).scheduleFileDeletionTask(
|
||||
argThat(new FileDeletionInclude(user, null,
|
||||
new String[] { destinationFile })));
|
||||
verify(delService, times(1)).scheduleFileDeletionTask(
|
||||
argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
|
||||
+ "_DEL_", new String[] {})));
|
||||
}
|
||||
|
||||
private int numOfLocalDirs(String localDir, String localSubDir) {
|
||||
|
@ -238,7 +248,8 @@ public class TestNodeManagerReboot {
|
|||
|
||||
private ContainerId createContainerId() {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
|
||||
return containerId;
|
||||
}
|
||||
|
@ -253,8 +264,8 @@ public class TestNodeManagerReboot {
|
|||
@Override
|
||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
|
||||
context, dispatcher, healthChecker, metrics);
|
||||
MockNodeStatusUpdater myNodeStatusUpdater =
|
||||
new MockNodeStatusUpdater(context, dispatcher, healthChecker, metrics);
|
||||
return myNodeStatusUpdater;
|
||||
}
|
||||
|
||||
|
@ -288,4 +299,58 @@ public class TestNodeManagerReboot {
|
|||
return ((Path) o).getName().indexOf(part) != -1;
|
||||
}
|
||||
}
|
||||
|
||||
class FileDeletionInclude extends ArgumentMatcher<FileDeletionTask> {
|
||||
final String user;
|
||||
final String subDirIncludes;
|
||||
final String[] baseDirIncludes;
|
||||
|
||||
public FileDeletionInclude(String user, String subDirIncludes,
|
||||
String [] baseDirIncludes) {
|
||||
this.user = user;
|
||||
this.subDirIncludes = subDirIncludes;
|
||||
this.baseDirIncludes = baseDirIncludes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object o) {
|
||||
FileDeletionTask fd = (FileDeletionTask)o;
|
||||
if (fd.getUser() == null && user != null) {
|
||||
return false;
|
||||
} else if (fd.getUser() != null && user == null) {
|
||||
return false;
|
||||
} else if (fd.getUser() != null && user != null) {
|
||||
return fd.getUser().equals(user);
|
||||
}
|
||||
if (!comparePaths(fd.getSubDir(), subDirIncludes)) {
|
||||
return false;
|
||||
}
|
||||
if (baseDirIncludes == null && fd.getBaseDirs() != null) {
|
||||
return false;
|
||||
} else if (baseDirIncludes != null && fd.getBaseDirs() == null ) {
|
||||
return false;
|
||||
} else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
|
||||
if (baseDirIncludes.length != fd.getBaseDirs().size()) {
|
||||
return false;
|
||||
}
|
||||
for (int i =0 ; i < baseDirIncludes.length; i++) {
|
||||
if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean comparePaths(Path p1, String p2) {
|
||||
if (p1 == null && p2 != null){
|
||||
return false;
|
||||
} else if (p1 != null && p2 == null) {
|
||||
return false;
|
||||
} else if (p1 != null && p2 != null ){
|
||||
return p1.toUri().getPath().contains(p2.toString());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue