YARN-6366. Refactor the NodeManager DeletionService to support additional DeletionTask types. Contributed by Shane Kumpf.
(cherry picked from commit 547f18cb96
)
This commit is contained in:
parent
a3ad1a39c5
commit
1eecde3355
|
@ -21,11 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -38,86 +35,141 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
|
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.NMProtoUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
public class DeletionService extends AbstractService {
|
public class DeletionService extends AbstractService {
|
||||||
static final Log LOG = LogFactory.getLog(DeletionService.class);
|
|
||||||
private int debugDelay;
|
|
||||||
private final ContainerExecutor exec;
|
|
||||||
private ScheduledThreadPoolExecutor sched;
|
|
||||||
private static final FileContext lfs = getLfs();
|
|
||||||
private final NMStateStoreService stateStore;
|
|
||||||
private AtomicInteger nextTaskId = new AtomicInteger(0);
|
|
||||||
|
|
||||||
static final FileContext getLfs() {
|
private static final Log LOG = LogFactory.getLog(DeletionService.class);
|
||||||
try {
|
|
||||||
return FileContext.getLocalFSFileContext();
|
private int debugDelay;
|
||||||
} catch (UnsupportedFileSystemException e) {
|
private final ContainerExecutor containerExecutor;
|
||||||
throw new RuntimeException(e);
|
private final NMStateStoreService stateStore;
|
||||||
}
|
private ScheduledThreadPoolExecutor sched;
|
||||||
}
|
private AtomicInteger nextTaskId = new AtomicInteger(0);
|
||||||
|
|
||||||
public DeletionService(ContainerExecutor exec) {
|
public DeletionService(ContainerExecutor exec) {
|
||||||
this(exec, new NMNullStateStoreService());
|
this(exec, new NMNullStateStoreService());
|
||||||
}
|
}
|
||||||
|
|
||||||
public DeletionService(ContainerExecutor exec,
|
public DeletionService(ContainerExecutor containerExecutor,
|
||||||
NMStateStoreService stateStore) {
|
NMStateStoreService stateStore) {
|
||||||
super(DeletionService.class.getName());
|
super(DeletionService.class.getName());
|
||||||
this.exec = exec;
|
this.containerExecutor = containerExecutor;
|
||||||
this.debugDelay = 0;
|
this.debugDelay = 0;
|
||||||
this.stateStore = stateStore;
|
this.stateStore = stateStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public int getDebugDelay() {
|
||||||
* Delete the path(s) as this user.
|
return debugDelay;
|
||||||
* @param user The user to delete as, or the JVM user if null
|
}
|
||||||
* @param subDir the sub directory name
|
|
||||||
* @param baseDirs the base directories which contains the subDir's
|
public ContainerExecutor getContainerExecutor() {
|
||||||
*/
|
return containerExecutor;
|
||||||
public void delete(String user, Path subDir, Path... baseDirs) {
|
}
|
||||||
// TODO if parent owned by NM, rename within parent inline
|
|
||||||
|
public NMStateStoreService getStateStore() {
|
||||||
|
return stateStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void delete(DeletionTask deletionTask) {
|
||||||
if (debugDelay != -1) {
|
if (debugDelay != -1) {
|
||||||
List<Path> baseDirList = null;
|
if (LOG.isDebugEnabled()) {
|
||||||
if (baseDirs != null && baseDirs.length != 0) {
|
String msg = String.format("Scheduling DeletionTask (delay %d) : %s",
|
||||||
baseDirList = Arrays.asList(baseDirs);
|
debugDelay, deletionTask.toString());
|
||||||
|
LOG.debug(msg);
|
||||||
}
|
}
|
||||||
FileDeletionTask task =
|
recordDeletionTaskInStateStore(deletionTask);
|
||||||
new FileDeletionTask(this, user, subDir, baseDirList);
|
sched.schedule(deletionTask, debugDelay, TimeUnit.SECONDS);
|
||||||
recordDeletionTaskInStateStore(task);
|
|
||||||
sched.schedule(task, debugDelay, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
|
private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
|
||||||
if (debugDelay != -1) {
|
throws IOException {
|
||||||
recordDeletionTaskInStateStore(fileDeletionTask);
|
List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
|
||||||
sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
|
Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
|
||||||
|
new HashMap<>(taskProtos.size());
|
||||||
|
Set<Integer> successorTasks = new HashSet<>();
|
||||||
|
for (DeletionServiceDeleteTaskProto proto : taskProtos) {
|
||||||
|
DeletionTaskRecoveryInfo info =
|
||||||
|
NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
|
||||||
|
idToInfoMap.put(info.getTask().getTaskId(), info);
|
||||||
|
nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
|
||||||
|
successorTasks.addAll(info.getSuccessorTaskIds());
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore the task dependencies and schedule the deletion tasks that
|
||||||
|
// have no predecessors
|
||||||
|
final long now = System.currentTimeMillis();
|
||||||
|
for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
|
||||||
|
for (Integer successorId : info.getSuccessorTaskIds()){
|
||||||
|
DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
|
||||||
|
if (successor != null) {
|
||||||
|
info.getTask().addDeletionTaskDependency(successor.getTask());
|
||||||
|
} else {
|
||||||
|
LOG.error("Unable to locate dependency task for deletion task "
|
||||||
|
+ info.getTask().getTaskId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!successorTasks.contains(info.getTask().getTaskId())) {
|
||||||
|
long msecTilDeletion = info.getDeletionTimestamp() - now;
|
||||||
|
sched.schedule(info.getTask(), msecTilDeletion, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int generateTaskId() {
|
||||||
|
// get the next ID but avoid an invalid ID
|
||||||
|
int taskId = nextTaskId.incrementAndGet();
|
||||||
|
while (taskId == DeletionTask.INVALID_TASK_ID) {
|
||||||
|
taskId = nextTaskId.incrementAndGet();
|
||||||
|
}
|
||||||
|
return taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recordDeletionTaskInStateStore(DeletionTask task) {
|
||||||
|
if (!stateStore.canRecover()) {
|
||||||
|
// optimize the case where we aren't really recording
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (task.getTaskId() != DeletionTask.INVALID_TASK_ID) {
|
||||||
|
return; // task already recorded
|
||||||
|
}
|
||||||
|
|
||||||
|
task.setTaskId(generateTaskId());
|
||||||
|
|
||||||
|
// store successors first to ensure task IDs have been generated for them
|
||||||
|
DeletionTask[] successors = task.getSuccessorTasks();
|
||||||
|
for (DeletionTask successor : successors) {
|
||||||
|
recordDeletionTaskInStateStore(successor);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
stateStore.storeDeletionTask(task.getTaskId(),
|
||||||
|
task.convertDeletionTaskToProto());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to store deletion task " + task.getTaskId(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
ThreadFactory tf = new ThreadFactoryBuilder()
|
ThreadFactory tf = new ThreadFactoryBuilder()
|
||||||
.setNameFormat("DeletionService #%d")
|
.setNameFormat("DeletionService #%d")
|
||||||
.build();
|
.build();
|
||||||
if (conf != null) {
|
if (conf != null) {
|
||||||
sched = new HadoopScheduledThreadPoolExecutor(
|
sched = new HadoopScheduledThreadPoolExecutor(
|
||||||
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
|
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
|
||||||
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
|
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
|
||||||
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
|
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
|
||||||
} else {
|
} else {
|
||||||
sched = new HadoopScheduledThreadPoolExecutor(
|
sched = new HadoopScheduledThreadPoolExecutor(
|
||||||
|
@ -132,15 +184,14 @@ public class DeletionService extends AbstractService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
public void serviceStop() throws Exception {
|
||||||
if (sched != null) {
|
if (sched != null) {
|
||||||
sched.shutdown();
|
sched.shutdown();
|
||||||
boolean terminated = false;
|
boolean terminated = false;
|
||||||
try {
|
try {
|
||||||
terminated = sched.awaitTermination(10, SECONDS);
|
terminated = sched.awaitTermination(10, SECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) { }
|
||||||
}
|
if (!terminated) {
|
||||||
if (terminated != true) {
|
|
||||||
sched.shutdownNow();
|
sched.shutdownNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -156,343 +207,4 @@ public class DeletionService extends AbstractService {
|
||||||
public boolean isTerminated() {
|
public boolean isTerminated() {
|
||||||
return getServiceState() == STATE.STOPPED && sched.isTerminated();
|
return getServiceState() == STATE.STOPPED && sched.isTerminated();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FileDeletionTask implements Runnable {
|
|
||||||
public static final int INVALID_TASK_ID = -1;
|
|
||||||
private int taskId;
|
|
||||||
private final String user;
|
|
||||||
private final Path subDir;
|
|
||||||
private final List<Path> baseDirs;
|
|
||||||
private final AtomicInteger numberOfPendingPredecessorTasks;
|
|
||||||
private final Set<FileDeletionTask> successorTaskSet;
|
|
||||||
private final DeletionService delService;
|
|
||||||
// By default all tasks will start as success=true; however if any of
|
|
||||||
// the dependent task fails then it will be marked as false in
|
|
||||||
// fileDeletionTaskFinished().
|
|
||||||
private boolean success;
|
|
||||||
|
|
||||||
private FileDeletionTask(DeletionService delService, String user,
|
|
||||||
Path subDir, List<Path> baseDirs) {
|
|
||||||
this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
|
|
||||||
}
|
|
||||||
|
|
||||||
private FileDeletionTask(int taskId, DeletionService delService,
|
|
||||||
String user, Path subDir, List<Path> baseDirs) {
|
|
||||||
this.taskId = taskId;
|
|
||||||
this.delService = delService;
|
|
||||||
this.user = user;
|
|
||||||
this.subDir = subDir;
|
|
||||||
this.baseDirs = baseDirs;
|
|
||||||
this.successorTaskSet = new HashSet<FileDeletionTask>();
|
|
||||||
this.numberOfPendingPredecessorTasks = new AtomicInteger(0);
|
|
||||||
success = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* increments and returns pending predecessor task count
|
|
||||||
*/
|
|
||||||
public int incrementAndGetPendingPredecessorTasks() {
|
|
||||||
return numberOfPendingPredecessorTasks.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* decrements and returns pending predecessor task count
|
|
||||||
*/
|
|
||||||
public int decrementAndGetPendingPredecessorTasks() {
|
|
||||||
return numberOfPendingPredecessorTasks.decrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public String getUser() {
|
|
||||||
return this.user;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public Path getSubDir() {
|
|
||||||
return this.subDir;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public List<Path> getBaseDirs() {
|
|
||||||
return this.baseDirs;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void setSuccess(boolean success) {
|
|
||||||
this.success = success;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean getSucess() {
|
|
||||||
return this.success;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized FileDeletionTask[] getSuccessorTasks() {
|
|
||||||
FileDeletionTask[] successors =
|
|
||||||
new FileDeletionTask[successorTaskSet.size()];
|
|
||||||
return successorTaskSet.toArray(successors);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(this);
|
|
||||||
}
|
|
||||||
boolean error = false;
|
|
||||||
if (null == user) {
|
|
||||||
if (baseDirs == null || baseDirs.size() == 0) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("NM deleting absolute path : " + subDir);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
lfs.delete(subDir, true);
|
|
||||||
} catch (IOException e) {
|
|
||||||
error = true;
|
|
||||||
LOG.warn("Failed to delete " + subDir);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (Path baseDir : baseDirs) {
|
|
||||||
Path del = subDir == null? baseDir : new Path(baseDir, subDir);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("NM deleting path : " + del);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
lfs.delete(del, true);
|
|
||||||
} catch (IOException e) {
|
|
||||||
error = true;
|
|
||||||
LOG.warn("Failed to delete " + subDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(
|
|
||||||
"Deleting path: [" + subDir + "] as user: [" + user + "]");
|
|
||||||
}
|
|
||||||
if (baseDirs == null || baseDirs.size() == 0) {
|
|
||||||
delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
|
|
||||||
.setUser(user)
|
|
||||||
.setSubDir(subDir)
|
|
||||||
.build());
|
|
||||||
} else {
|
|
||||||
delService.exec.deleteAsUser(new DeletionAsUserContext.Builder()
|
|
||||||
.setUser(user)
|
|
||||||
.setSubDir(subDir)
|
|
||||||
.setBasedirs(baseDirs.toArray(new Path[0]))
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
error = true;
|
|
||||||
LOG.warn("Failed to delete as user " + user, e);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
error = true;
|
|
||||||
LOG.warn("Failed to delete as user " + user, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (error) {
|
|
||||||
setSuccess(!error);
|
|
||||||
}
|
|
||||||
fileDeletionTaskFinished();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
StringBuffer sb = new StringBuffer("\nFileDeletionTask : ");
|
|
||||||
sb.append(" user : ").append(this.user);
|
|
||||||
sb.append(" subDir : ").append(
|
|
||||||
subDir == null ? "null" : subDir.toString());
|
|
||||||
sb.append(" baseDir : ");
|
|
||||||
if (baseDirs == null || baseDirs.size() == 0) {
|
|
||||||
sb.append("null");
|
|
||||||
} else {
|
|
||||||
for (Path baseDir : baseDirs) {
|
|
||||||
sb.append(baseDir.toString()).append(',');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If there is a task dependency between say tasks 1,2,3 such that
|
|
||||||
* task2 and task3 can be started only after task1 then we should define
|
|
||||||
* task2 and task3 as successor tasks for task1.
|
|
||||||
* Note:- Task dependency should be defined prior to
|
|
||||||
* @param successorTask
|
|
||||||
*/
|
|
||||||
public synchronized void addFileDeletionTaskDependency(
|
|
||||||
FileDeletionTask successorTask) {
|
|
||||||
if (successorTaskSet.add(successorTask)) {
|
|
||||||
successorTask.incrementAndGetPendingPredecessorTasks();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This is called when
|
|
||||||
* 1) Current file deletion task ran and finished.
|
|
||||||
* 2) This can be even directly called by predecessor task if one of the
|
|
||||||
* dependent tasks of it has failed marking its success = false.
|
|
||||||
*/
|
|
||||||
private synchronized void fileDeletionTaskFinished() {
|
|
||||||
try {
|
|
||||||
delService.stateStore.removeDeletionTask(taskId);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Unable to remove deletion task " + taskId
|
|
||||||
+ " from state store", e);
|
|
||||||
}
|
|
||||||
Iterator<FileDeletionTask> successorTaskI =
|
|
||||||
this.successorTaskSet.iterator();
|
|
||||||
while (successorTaskI.hasNext()) {
|
|
||||||
FileDeletionTask successorTask = successorTaskI.next();
|
|
||||||
if (!success) {
|
|
||||||
successorTask.setSuccess(success);
|
|
||||||
}
|
|
||||||
int count = successorTask.decrementAndGetPendingPredecessorTasks();
|
|
||||||
if (count == 0) {
|
|
||||||
if (successorTask.getSucess()) {
|
|
||||||
successorTask.delService.scheduleFileDeletionTask(successorTask);
|
|
||||||
} else {
|
|
||||||
successorTask.fileDeletionTaskFinished();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper method to create file deletion task. To be used only if we need
|
|
||||||
* a way to define dependencies between deletion tasks.
|
|
||||||
* @param user user on whose behalf this task is suppose to run
|
|
||||||
* @param subDir sub directory as required in
|
|
||||||
* {@link DeletionService#delete(String, Path, Path...)}
|
|
||||||
* @param baseDirs base directories as required in
|
|
||||||
* {@link DeletionService#delete(String, Path, Path...)}
|
|
||||||
*/
|
|
||||||
public FileDeletionTask createFileDeletionTask(String user, Path subDir,
|
|
||||||
Path[] baseDirs) {
|
|
||||||
return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void recover(RecoveredDeletionServiceState state)
|
|
||||||
throws IOException {
|
|
||||||
List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
|
|
||||||
Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
|
|
||||||
new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
|
|
||||||
Set<Integer> successorTasks = new HashSet<Integer>();
|
|
||||||
for (DeletionServiceDeleteTaskProto proto : taskProtos) {
|
|
||||||
DeletionTaskRecoveryInfo info = parseTaskProto(proto);
|
|
||||||
idToInfoMap.put(info.task.taskId, info);
|
|
||||||
nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
|
|
||||||
successorTasks.addAll(info.successorTaskIds);
|
|
||||||
}
|
|
||||||
|
|
||||||
// restore the task dependencies and schedule the deletion tasks that
|
|
||||||
// have no predecessors
|
|
||||||
final long now = System.currentTimeMillis();
|
|
||||||
for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
|
|
||||||
for (Integer successorId : info.successorTaskIds){
|
|
||||||
DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
|
|
||||||
if (successor != null) {
|
|
||||||
info.task.addFileDeletionTaskDependency(successor.task);
|
|
||||||
} else {
|
|
||||||
LOG.error("Unable to locate dependency task for deletion task "
|
|
||||||
+ info.task.taskId + " at " + info.task.getSubDir());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!successorTasks.contains(info.task.taskId)) {
|
|
||||||
long msecTilDeletion = info.deletionTimestamp - now;
|
|
||||||
sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private DeletionTaskRecoveryInfo parseTaskProto(
|
|
||||||
DeletionServiceDeleteTaskProto proto) throws IOException {
|
|
||||||
int taskId = proto.getId();
|
|
||||||
String user = proto.hasUser() ? proto.getUser() : null;
|
|
||||||
Path subdir = null;
|
|
||||||
List<Path> basePaths = null;
|
|
||||||
if (proto.hasSubdir()) {
|
|
||||||
subdir = new Path(proto.getSubdir());
|
|
||||||
}
|
|
||||||
List<String> basedirs = proto.getBasedirsList();
|
|
||||||
if (basedirs != null && basedirs.size() > 0) {
|
|
||||||
basePaths = new ArrayList<Path>(basedirs.size());
|
|
||||||
for (String basedir : basedirs) {
|
|
||||||
basePaths.add(new Path(basedir));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
FileDeletionTask task = new FileDeletionTask(taskId, this, user,
|
|
||||||
subdir, basePaths);
|
|
||||||
return new DeletionTaskRecoveryInfo(task,
|
|
||||||
proto.getSuccessorIdsList(),
|
|
||||||
proto.getDeletionTime());
|
|
||||||
}
|
|
||||||
|
|
||||||
private int generateTaskId() {
|
|
||||||
// get the next ID but avoid an invalid ID
|
|
||||||
int taskId = nextTaskId.incrementAndGet();
|
|
||||||
while (taskId == FileDeletionTask.INVALID_TASK_ID) {
|
|
||||||
taskId = nextTaskId.incrementAndGet();
|
|
||||||
}
|
|
||||||
return taskId;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void recordDeletionTaskInStateStore(FileDeletionTask task) {
|
|
||||||
if (!stateStore.canRecover()) {
|
|
||||||
// optimize the case where we aren't really recording
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
|
|
||||||
return; // task already recorded
|
|
||||||
}
|
|
||||||
|
|
||||||
task.taskId = generateTaskId();
|
|
||||||
|
|
||||||
FileDeletionTask[] successors = task.getSuccessorTasks();
|
|
||||||
|
|
||||||
// store successors first to ensure task IDs have been generated for them
|
|
||||||
for (FileDeletionTask successor : successors) {
|
|
||||||
recordDeletionTaskInStateStore(successor);
|
|
||||||
}
|
|
||||||
|
|
||||||
DeletionServiceDeleteTaskProto.Builder builder =
|
|
||||||
DeletionServiceDeleteTaskProto.newBuilder();
|
|
||||||
builder.setId(task.taskId);
|
|
||||||
if (task.getUser() != null) {
|
|
||||||
builder.setUser(task.getUser());
|
|
||||||
}
|
|
||||||
if (task.getSubDir() != null) {
|
|
||||||
builder.setSubdir(task.getSubDir().toString());
|
|
||||||
}
|
|
||||||
builder.setDeletionTime(System.currentTimeMillis() +
|
|
||||||
TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
|
|
||||||
if (task.getBaseDirs() != null) {
|
|
||||||
for (Path dir : task.getBaseDirs()) {
|
|
||||||
builder.addBasedirs(dir.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (FileDeletionTask successor : successors) {
|
|
||||||
builder.addSuccessorIds(successor.taskId);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
stateStore.storeDeletionTask(task.taskId, builder.build());
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Unable to store deletion task " + task.taskId + " for "
|
|
||||||
+ task.getSubDir(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class DeletionTaskRecoveryInfo {
|
|
||||||
FileDeletionTask task;
|
|
||||||
List<Integer> successorTaskIds;
|
|
||||||
long deletionTimestamp;
|
|
||||||
|
|
||||||
public DeletionTaskRecoveryInfo(FileDeletionTask task,
|
|
||||||
List<Integer> successorTaskIds, long deletionTimestamp) {
|
|
||||||
this.task = task;
|
|
||||||
this.successorTaskIds = successorTaskIds;
|
|
||||||
this.deletionTimestamp = deletionTimestamp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -0,0 +1,110 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utilities for converting from PB representations.
|
||||||
|
*/
|
||||||
|
public final class NMProtoUtils {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(NMProtoUtils.class);
|
||||||
|
|
||||||
|
private NMProtoUtils() { }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the Protobuf representation into a {@link DeletionTask}.
|
||||||
|
*
|
||||||
|
* @param proto the Protobuf representation for the DeletionTask
|
||||||
|
* @param deletionService the {@link DeletionService}
|
||||||
|
* @return the converted {@link DeletionTask}
|
||||||
|
*/
|
||||||
|
public static DeletionTask convertProtoToDeletionTask(
|
||||||
|
DeletionServiceDeleteTaskProto proto, DeletionService deletionService) {
|
||||||
|
int taskId = proto.getId();
|
||||||
|
if (proto.hasTaskType() && proto.getTaskType() != null) {
|
||||||
|
if (proto.getTaskType().equals(DeletionTaskType.FILE.name())) {
|
||||||
|
LOG.debug("Converting recovered FileDeletionTask");
|
||||||
|
return convertProtoToFileDeletionTask(proto, deletionService, taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.debug("Unable to get task type, trying FileDeletionTask");
|
||||||
|
return convertProtoToFileDeletionTask(proto, deletionService, taskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the Protobuf representation into the {@link FileDeletionTask}.
|
||||||
|
*
|
||||||
|
* @param proto the Protobuf representation of the {@link FileDeletionTask}
|
||||||
|
* @param deletionService the {@link DeletionService}.
|
||||||
|
* @param taskId the ID of the {@link DeletionTask}.
|
||||||
|
* @return the populated {@link FileDeletionTask}.
|
||||||
|
*/
|
||||||
|
public static FileDeletionTask convertProtoToFileDeletionTask(
|
||||||
|
DeletionServiceDeleteTaskProto proto, DeletionService deletionService,
|
||||||
|
int taskId) {
|
||||||
|
String user = proto.hasUser() ? proto.getUser() : null;
|
||||||
|
Path subdir = null;
|
||||||
|
if (proto.hasSubdir()) {
|
||||||
|
subdir = new Path(proto.getSubdir());
|
||||||
|
}
|
||||||
|
List<Path> basePaths = null;
|
||||||
|
List<String> basedirs = proto.getBasedirsList();
|
||||||
|
if (basedirs != null && basedirs.size() > 0) {
|
||||||
|
basePaths = new ArrayList<>(basedirs.size());
|
||||||
|
for (String basedir : basedirs) {
|
||||||
|
basePaths.add(new Path(basedir));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new FileDeletionTask(taskId, deletionService, user, subdir,
|
||||||
|
basePaths);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the Protobuf representation to the {@link DeletionTaskRecoveryInfo}
|
||||||
|
* representation.
|
||||||
|
*
|
||||||
|
* @param proto the Protobuf representation of the {@link DeletionTask}
|
||||||
|
* @param deletionService the {@link DeletionService}
|
||||||
|
* @return the populated {@link DeletionTaskRecoveryInfo}
|
||||||
|
*/
|
||||||
|
public static DeletionTaskRecoveryInfo convertProtoToDeletionTaskRecoveryInfo(
|
||||||
|
DeletionServiceDeleteTaskProto proto, DeletionService deletionService) {
|
||||||
|
DeletionTask deletionTask =
|
||||||
|
NMProtoUtils.convertProtoToDeletionTask(proto, deletionService);
|
||||||
|
List<Integer> successorTaskIds = new ArrayList<>();
|
||||||
|
if (proto.getSuccessorIdsList() != null &&
|
||||||
|
!proto.getSuccessorIdsList().isEmpty()) {
|
||||||
|
successorTaskIds = proto.getSuccessorIdsList();
|
||||||
|
}
|
||||||
|
long deletionTimestamp = proto.getDeletionTime();
|
||||||
|
return new DeletionTaskRecoveryInfo(deletionTask, successorTaskIds,
|
||||||
|
deletionTimestamp);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Package containing classes for working with Protobuf.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,73 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulates the recovery info needed to recover a DeletionTask from the NM
|
||||||
|
* state store.
|
||||||
|
*/
|
||||||
|
public class DeletionTaskRecoveryInfo {
|
||||||
|
|
||||||
|
private DeletionTask task;
|
||||||
|
private List<Integer> successorTaskIds;
|
||||||
|
private long deletionTimestamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Information needed for recovering the DeletionTask.
|
||||||
|
*
|
||||||
|
* @param task the DeletionTask
|
||||||
|
* @param successorTaskIds the dependent DeletionTasks.
|
||||||
|
* @param deletionTimestamp the scheduled times of deletion.
|
||||||
|
*/
|
||||||
|
public DeletionTaskRecoveryInfo(DeletionTask task,
|
||||||
|
List<Integer> successorTaskIds, long deletionTimestamp) {
|
||||||
|
this.task = task;
|
||||||
|
this.successorTaskIds = successorTaskIds;
|
||||||
|
this.deletionTimestamp = deletionTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the recovered DeletionTask.
|
||||||
|
*
|
||||||
|
* @return the recovered DeletionTask.
|
||||||
|
*/
|
||||||
|
public DeletionTask getTask() {
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return all of the dependent DeletionTasks.
|
||||||
|
*
|
||||||
|
* @return the dependent DeletionTasks.
|
||||||
|
*/
|
||||||
|
public List<Integer> getSuccessorTaskIds() {
|
||||||
|
return successorTaskIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the deletion timestamp.
|
||||||
|
*
|
||||||
|
* @return the deletion timestamp.
|
||||||
|
*/
|
||||||
|
public long getDeletionTimestamp() {
|
||||||
|
return deletionTimestamp;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Package containing classes for recovering DeletionTasks.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -0,0 +1,258 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DeletionTasks are supplied to the {@link DeletionService} for deletion.
|
||||||
|
*/
|
||||||
|
public abstract class DeletionTask implements Runnable {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(DeletionTask.class);
|
||||||
|
|
||||||
|
public static final int INVALID_TASK_ID = -1;
|
||||||
|
|
||||||
|
private int taskId;
|
||||||
|
private String user;
|
||||||
|
private DeletionTaskType deletionTaskType;
|
||||||
|
private DeletionService deletionService;
|
||||||
|
private final AtomicInteger numberOfPendingPredecessorTasks;
|
||||||
|
private final Set<DeletionTask> successorTaskSet;
|
||||||
|
// By default all tasks will start as success=true; however if any of
|
||||||
|
// the dependent task fails then it will be marked as false in
|
||||||
|
// deletionTaskFinished().
|
||||||
|
private boolean success;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletion task with taskId and default values.
|
||||||
|
*
|
||||||
|
* @param taskId the ID of the task, if previously set.
|
||||||
|
* @param deletionService the {@link DeletionService}.
|
||||||
|
* @param user the user associated with the delete.
|
||||||
|
* @param deletionTaskType the {@link DeletionTaskType}.
|
||||||
|
*/
|
||||||
|
public DeletionTask(int taskId, DeletionService deletionService, String user,
|
||||||
|
DeletionTaskType deletionTaskType) {
|
||||||
|
this(taskId, deletionService, user, new AtomicInteger(0),
|
||||||
|
new HashSet<DeletionTask>(), deletionTaskType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletion task with taskId and user supplied values.
|
||||||
|
*
|
||||||
|
* @param taskId the ID of the task, if previously set.
|
||||||
|
* @param deletionService the {@link DeletionService}.
|
||||||
|
* @param user the user associated with the delete.
|
||||||
|
* @param numberOfPendingPredecessorTasks Number of pending tasks.
|
||||||
|
* @param successorTaskSet the list of successor DeletionTasks
|
||||||
|
* @param deletionTaskType the {@link DeletionTaskType}.
|
||||||
|
*/
|
||||||
|
public DeletionTask(int taskId, DeletionService deletionService, String user,
|
||||||
|
AtomicInteger numberOfPendingPredecessorTasks,
|
||||||
|
Set<DeletionTask> successorTaskSet, DeletionTaskType deletionTaskType) {
|
||||||
|
this.taskId = taskId;
|
||||||
|
this.deletionService = deletionService;
|
||||||
|
this.user = user;
|
||||||
|
this.numberOfPendingPredecessorTasks = numberOfPendingPredecessorTasks;
|
||||||
|
this.successorTaskSet = successorTaskSet;
|
||||||
|
this.deletionTaskType = deletionTaskType;
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the taskId for the DeletionTask.
|
||||||
|
*
|
||||||
|
* @return the taskId.
|
||||||
|
*/
|
||||||
|
public int getTaskId() {
|
||||||
|
return taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the taskId for the DeletionTask.
|
||||||
|
*
|
||||||
|
* @param taskId the taskId.
|
||||||
|
*/
|
||||||
|
public void setTaskId(int taskId) {
|
||||||
|
this.taskId = taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The the user assoicated with the DeletionTask.
|
||||||
|
*
|
||||||
|
* @return the user name.
|
||||||
|
*/
|
||||||
|
public String getUser() {
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the {@link DeletionService} for this DeletionTask.
|
||||||
|
*
|
||||||
|
* @return the {@link DeletionService}.
|
||||||
|
*/
|
||||||
|
public DeletionService getDeletionService() {
|
||||||
|
return deletionService;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the {@link DeletionTaskType} for this DeletionTask.
|
||||||
|
*
|
||||||
|
* @return the {@link DeletionTaskType}.
|
||||||
|
*/
|
||||||
|
public DeletionTaskType getDeletionTaskType() {
|
||||||
|
return deletionTaskType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the DeletionTask run status.
|
||||||
|
*
|
||||||
|
* @param success the status of the running DeletionTask.
|
||||||
|
*/
|
||||||
|
public synchronized void setSuccess(boolean success) {
|
||||||
|
this.success = success;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the DeletionTask run status.
|
||||||
|
*
|
||||||
|
* @return the status of the running DeletionTask.
|
||||||
|
*/
|
||||||
|
public synchronized boolean getSucess() {
|
||||||
|
return this.success;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the list of successor tasks for the DeletionTask.
|
||||||
|
*
|
||||||
|
* @return the list of successor tasks.
|
||||||
|
*/
|
||||||
|
public synchronized DeletionTask[] getSuccessorTasks() {
|
||||||
|
DeletionTask[] successors = new DeletionTask[successorTaskSet.size()];
|
||||||
|
return successorTaskSet.toArray(successors);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the DeletionTask to the Protobuf representation for storing in the
|
||||||
|
* state store and recovery.
|
||||||
|
*
|
||||||
|
* @return the protobuf representation of the DeletionTask.
|
||||||
|
*/
|
||||||
|
public abstract DeletionServiceDeleteTaskProto convertDeletionTaskToProto();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a dependent DeletionTask.
|
||||||
|
*
|
||||||
|
* If there is a task dependency between say tasks 1,2,3 such that
|
||||||
|
* task2 and task3 can be started only after task1 then we should define
|
||||||
|
* task2 and task3 as successor tasks for task1.
|
||||||
|
* Note:- Task dependency should be defined prior to calling delete.
|
||||||
|
*
|
||||||
|
* @param successorTask the DeletionTask the depends on this DeletionTask.
|
||||||
|
*/
|
||||||
|
public synchronized void addDeletionTaskDependency(
|
||||||
|
DeletionTask successorTask) {
|
||||||
|
if (successorTaskSet.add(successorTask)) {
|
||||||
|
successorTask.incrementAndGetPendingPredecessorTasks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments and returns pending predecessor task count.
|
||||||
|
*
|
||||||
|
* @return the number of pending predecessor DeletionTasks.
|
||||||
|
*/
|
||||||
|
public int incrementAndGetPendingPredecessorTasks() {
|
||||||
|
return numberOfPendingPredecessorTasks.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrements and returns pending predecessor task count.
|
||||||
|
*
|
||||||
|
* @return the number of pending predecessor DeletionTasks.
|
||||||
|
*/
|
||||||
|
public int decrementAndGetPendingPredecessorTasks() {
|
||||||
|
return numberOfPendingPredecessorTasks.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the DeletionTask from the state store and validates that successor
|
||||||
|
* tasks have been scheduled and completed.
|
||||||
|
*
|
||||||
|
* This is called when:
|
||||||
|
* 1) Current deletion task ran and finished.
|
||||||
|
* 2) When directly called by predecessor task if one of the
|
||||||
|
* dependent tasks of it has failed marking its success = false.
|
||||||
|
*/
|
||||||
|
synchronized void deletionTaskFinished() {
|
||||||
|
try {
|
||||||
|
NMStateStoreService stateStore = deletionService.getStateStore();
|
||||||
|
stateStore.removeDeletionTask(taskId);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to remove deletion task " + taskId
|
||||||
|
+ " from state store", e);
|
||||||
|
}
|
||||||
|
Iterator<DeletionTask> successorTaskI = this.successorTaskSet.iterator();
|
||||||
|
while (successorTaskI.hasNext()) {
|
||||||
|
DeletionTask successorTask = successorTaskI.next();
|
||||||
|
if (!success) {
|
||||||
|
successorTask.setSuccess(success);
|
||||||
|
}
|
||||||
|
int count = successorTask.decrementAndGetPendingPredecessorTasks();
|
||||||
|
if (count == 0) {
|
||||||
|
if (successorTask.getSucess()) {
|
||||||
|
successorTask.deletionService.delete(successorTask);
|
||||||
|
} else {
|
||||||
|
successorTask.deletionTaskFinished();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the Protobuf builder with the base DeletionTask attributes.
|
||||||
|
*
|
||||||
|
* @return pre-populated Buidler with the base attributes.
|
||||||
|
*/
|
||||||
|
DeletionServiceDeleteTaskProto.Builder getBaseDeletionTaskProtoBuilder() {
|
||||||
|
DeletionServiceDeleteTaskProto.Builder builder =
|
||||||
|
DeletionServiceDeleteTaskProto.newBuilder();
|
||||||
|
builder.setId(getTaskId());
|
||||||
|
if (getUser() != null) {
|
||||||
|
builder.setUser(getUser());
|
||||||
|
}
|
||||||
|
builder.setDeletionTime(System.currentTimeMillis() +
|
||||||
|
TimeUnit.MILLISECONDS.convert(getDeletionService().getDebugDelay(),
|
||||||
|
TimeUnit.SECONDS));
|
||||||
|
for (DeletionTask successor : getSuccessorTasks()) {
|
||||||
|
builder.addSuccessorIds(successor.getTaskId());
|
||||||
|
}
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Available types of {@link DeletionTask}s.
|
||||||
|
*/
|
||||||
|
public enum DeletionTaskType {
|
||||||
|
FILE
|
||||||
|
}
|
|
@ -0,0 +1,202 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link DeletionTask} handling the removal of files (and directories).
|
||||||
|
*/
|
||||||
|
public class FileDeletionTask extends DeletionTask implements Runnable {
|
||||||
|
|
||||||
|
private final Path subDir;
|
||||||
|
private final List<Path> baseDirs;
|
||||||
|
private static final FileContext lfs = getLfs();
|
||||||
|
|
||||||
|
private static FileContext getLfs() {
|
||||||
|
try {
|
||||||
|
return FileContext.getLocalFSFileContext();
|
||||||
|
} catch (UnsupportedFileSystemException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a FileDeletionTask with the default INVALID_TASK_ID.
|
||||||
|
*
|
||||||
|
* @param deletionService the {@link DeletionService}.
|
||||||
|
* @param user the user deleting the file.
|
||||||
|
* @param subDir the subdirectory to delete.
|
||||||
|
* @param baseDirs the base directories containing the subdir.
|
||||||
|
*/
|
||||||
|
public FileDeletionTask(DeletionService deletionService, String user,
|
||||||
|
Path subDir, List<Path> baseDirs) {
|
||||||
|
this(INVALID_TASK_ID, deletionService, user, subDir, baseDirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a FileDeletionTask with the default INVALID_TASK_ID.
|
||||||
|
*
|
||||||
|
* @param taskId the ID of the task, if previously set.
|
||||||
|
* @param deletionService the {@link DeletionService}.
|
||||||
|
* @param user the user deleting the file.
|
||||||
|
* @param subDir the subdirectory to delete.
|
||||||
|
* @param baseDirs the base directories containing the subdir.
|
||||||
|
*/
|
||||||
|
public FileDeletionTask(int taskId, DeletionService deletionService,
|
||||||
|
String user, Path subDir, List<Path> baseDirs) {
|
||||||
|
super(taskId, deletionService, user, DeletionTaskType.FILE);
|
||||||
|
this.subDir = subDir;
|
||||||
|
this.baseDirs = baseDirs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the subdirectory to delete.
|
||||||
|
*
|
||||||
|
* @return the subDir for the FileDeletionTask.
|
||||||
|
*/
|
||||||
|
public Path getSubDir() {
|
||||||
|
return this.subDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the base directories containing the subdirectory.
|
||||||
|
*
|
||||||
|
* @return the base directories for the FileDeletionTask.
|
||||||
|
*/
|
||||||
|
public List<Path> getBaseDirs() {
|
||||||
|
return this.baseDirs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the specified file/directory as the specified user.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
String msg = String.format("Running DeletionTask : %s", toString());
|
||||||
|
LOG.debug(msg);
|
||||||
|
}
|
||||||
|
boolean error = false;
|
||||||
|
if (null == getUser()) {
|
||||||
|
if (baseDirs == null || baseDirs.size() == 0) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("NM deleting absolute path : " + subDir);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
lfs.delete(subDir, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
error = true;
|
||||||
|
LOG.warn("Failed to delete " + subDir);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (Path baseDir : baseDirs) {
|
||||||
|
Path del = subDir == null? baseDir : new Path(baseDir, subDir);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("NM deleting path : " + del);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
lfs.delete(del, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
error = true;
|
||||||
|
LOG.warn("Failed to delete " + subDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Deleting path: [" + subDir + "] as user: [" + getUser() + "]");
|
||||||
|
}
|
||||||
|
if (baseDirs == null || baseDirs.size() == 0) {
|
||||||
|
getDeletionService().getContainerExecutor().deleteAsUser(
|
||||||
|
new DeletionAsUserContext.Builder()
|
||||||
|
.setUser(getUser())
|
||||||
|
.setSubDir(subDir)
|
||||||
|
.build());
|
||||||
|
} else {
|
||||||
|
getDeletionService().getContainerExecutor().deleteAsUser(
|
||||||
|
new DeletionAsUserContext.Builder()
|
||||||
|
.setUser(getUser())
|
||||||
|
.setSubDir(subDir)
|
||||||
|
.setBasedirs(baseDirs.toArray(new Path[0]))
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
} catch (IOException|InterruptedException e) {
|
||||||
|
error = true;
|
||||||
|
LOG.warn("Failed to delete as user " + getUser(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (error) {
|
||||||
|
setSuccess(!error);
|
||||||
|
}
|
||||||
|
deletionTaskFinished();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the FileDeletionTask to a String representation.
|
||||||
|
*
|
||||||
|
* @return String representation of the FileDeletionTask.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder sb = new StringBuilder("FileDeletionTask :");
|
||||||
|
sb.append(" id : ").append(getTaskId());
|
||||||
|
sb.append(" user : ").append(getUser());
|
||||||
|
sb.append(" subDir : ").append(
|
||||||
|
subDir == null ? "null" : subDir.toString());
|
||||||
|
sb.append(" baseDir : ");
|
||||||
|
if (baseDirs == null || baseDirs.size() == 0) {
|
||||||
|
sb.append("null");
|
||||||
|
} else {
|
||||||
|
for (Path baseDir : baseDirs) {
|
||||||
|
sb.append(baseDir.toString()).append(',');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sb.toString().trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the FileDeletionTask to the Protobuf representation for storing
|
||||||
|
* in the state store and recovery.
|
||||||
|
*
|
||||||
|
* @return the protobuf representation of the FileDeletionTask.
|
||||||
|
*/
|
||||||
|
public DeletionServiceDeleteTaskProto convertDeletionTaskToProto() {
|
||||||
|
DeletionServiceDeleteTaskProto.Builder builder =
|
||||||
|
getBaseDeletionTaskProtoBuilder();
|
||||||
|
builder.setTaskType(DeletionTaskType.FILE.name());
|
||||||
|
if (getSubDir() != null) {
|
||||||
|
builder.setSubdir(getSubDir().toString());
|
||||||
|
}
|
||||||
|
if (getBaseDirs() != null) {
|
||||||
|
for (Path dir : getBaseDirs()) {
|
||||||
|
builder.addBasedirs(dir.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* Package containing DeletionTasks for use with the DeletionService.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
|
||||||
|
@ -113,9 +114,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
|
this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
|
||||||
if (this.useLocalCacheDirectoryManager) {
|
if (this.useLocalCacheDirectoryManager) {
|
||||||
directoryManagers =
|
directoryManagers =
|
||||||
new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
|
new ConcurrentHashMap<>();
|
||||||
inProgressLocalResourcesMap =
|
inProgressLocalResourcesMap =
|
||||||
new ConcurrentHashMap<LocalResourceRequest, Path>();
|
new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.stateStore = stateStore;
|
this.stateStore = stateStore;
|
||||||
|
@ -393,7 +394,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
return false;
|
return false;
|
||||||
} else { // ResourceState is LOCALIZED or INIT
|
} else { // ResourceState is LOCALIZED or INIT
|
||||||
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
|
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
|
||||||
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
|
FileDeletionTask deletionTask = new FileDeletionTask(delService,
|
||||||
|
getUser(), getPathToDelete(rsrc.getLocalPath()), null);
|
||||||
|
delService.delete(deletionTask);
|
||||||
}
|
}
|
||||||
removeResource(rem.getRequest());
|
removeResource(rem.getRequest());
|
||||||
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
|
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
|
||||||
|
@ -488,7 +491,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
||||||
LOG.warn("Directory " + uniquePath + " already exists, " +
|
LOG.warn("Directory " + uniquePath + " already exists, " +
|
||||||
"try next one.");
|
"try next one.");
|
||||||
if (delService != null) {
|
if (delService != null) {
|
||||||
delService.delete(getUser(), uniquePath);
|
FileDeletionTask deletionTask = new FileDeletionTask(delService,
|
||||||
|
getUser(), uniquePath, null);
|
||||||
|
delService.delete(deletionTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,6 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
|
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
|
||||||
|
@ -113,6 +112,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
||||||
|
@ -604,7 +604,9 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
private void submitDirForDeletion(String userName, Path dir) {
|
private void submitDirForDeletion(String userName, Path dir) {
|
||||||
try {
|
try {
|
||||||
lfs.getFileStatus(dir);
|
lfs.getFileStatus(dir);
|
||||||
delService.delete(userName, dir, new Path[] {});
|
FileDeletionTask deletionTask = new FileDeletionTask(delService, userName,
|
||||||
|
dir, null);
|
||||||
|
delService.delete(deletionTask);
|
||||||
} catch (UnsupportedFileSystemException ue) {
|
} catch (UnsupportedFileSystemException ue) {
|
||||||
LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
|
LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
|
@ -1234,10 +1236,13 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
event.getResource().unlock();
|
event.getResource().unlock();
|
||||||
}
|
}
|
||||||
if (!paths.isEmpty()) {
|
if (!paths.isEmpty()) {
|
||||||
delService.delete(context.getUser(),
|
FileDeletionTask deletionTask = new FileDeletionTask(delService,
|
||||||
null, paths.toArray(new Path[paths.size()]));
|
context.getUser(), null, paths);
|
||||||
|
delService.delete(deletionTask);
|
||||||
}
|
}
|
||||||
delService.delete(null, nmPrivateCTokensPath, new Path[] {});
|
FileDeletionTask deletionTask = new FileDeletionTask(delService, null,
|
||||||
|
nmPrivateCTokensPath, null);
|
||||||
|
delService.delete(deletionTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1456,7 +1461,9 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
String appName = fileStatus.getPath().getName();
|
String appName = fileStatus.getPath().getName();
|
||||||
if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) {
|
if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) {
|
||||||
LOG.info("delete app log dir," + appName);
|
LOG.info("delete app log dir," + appName);
|
||||||
del.delete(null, fileStatus.getPath());
|
FileDeletionTask deletionTask = new FileDeletionTask(del, null,
|
||||||
|
fileStatus.getPath(), null);
|
||||||
|
del.delete(deletionTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1516,7 +1523,9 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
||
|
||
|
||||||
status.getPath().getName()
|
status.getPath().getName()
|
||||||
.matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
|
.matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
|
||||||
del.delete(null, status.getPath(), new Path[] {});
|
FileDeletionTask deletionTask = new FileDeletionTask(del, null,
|
||||||
|
status.getPath(), null);
|
||||||
|
del.delete(deletionTask);
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// Do nothing, just give the warning
|
// Do nothing, just give the warning
|
||||||
|
@ -1530,24 +1539,25 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
|
private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
|
||||||
Path userDirPath) throws IOException {
|
Path userDirPath) throws IOException {
|
||||||
RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
|
RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
|
||||||
FileDeletionTask dependentDeletionTask =
|
FileDeletionTask dependentDeletionTask = new FileDeletionTask(del, null,
|
||||||
del.createFileDeletionTask(null, userDirPath, new Path[] {});
|
userDirPath, new ArrayList<Path>());
|
||||||
if (userDirStatus != null && userDirStatus.hasNext()) {
|
if (userDirStatus != null && userDirStatus.hasNext()) {
|
||||||
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
||||||
while (userDirStatus.hasNext()) {
|
while (userDirStatus.hasNext()) {
|
||||||
FileStatus status = userDirStatus.next();
|
FileStatus status = userDirStatus.next();
|
||||||
String owner = status.getOwner();
|
String owner = status.getOwner();
|
||||||
FileDeletionTask deletionTask =
|
List<Path> pathList = new ArrayList<>();
|
||||||
del.createFileDeletionTask(owner, null,
|
pathList.add(status.getPath());
|
||||||
new Path[] { status.getPath() });
|
FileDeletionTask deletionTask = new FileDeletionTask(del, owner, null,
|
||||||
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
|
pathList);
|
||||||
|
deletionTask.addDeletionTaskDependency(dependentDeletionTask);
|
||||||
deletionTasks.add(deletionTask);
|
deletionTasks.add(deletionTask);
|
||||||
}
|
}
|
||||||
for (FileDeletionTask task : deletionTasks) {
|
for (FileDeletionTask task : deletionTasks) {
|
||||||
del.scheduleFileDeletionTask(task);
|
del.delete(task);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
del.scheduleFileDeletionTask(dependentDeletionTask);
|
del.delete(dependentDeletionTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,6 +69,8 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.Times;
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
|
@ -258,19 +260,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
addCredentials();
|
||||||
Credentials systemCredentials =
|
|
||||||
context.getSystemCredentialsForApps().get(appId);
|
|
||||||
if (systemCredentials != null) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Adding new framework-token for " + appId
|
|
||||||
+ " for log-aggregation: " + systemCredentials.getAllTokens()
|
|
||||||
+ "; userUgi=" + userUgi);
|
|
||||||
}
|
|
||||||
// this will replace old token
|
|
||||||
userUgi.addCredentials(systemCredentials);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a set of Containers whose logs will be uploaded in this cycle.
|
// Create a set of Containers whose logs will be uploaded in this cycle.
|
||||||
// It includes:
|
// It includes:
|
||||||
|
@ -332,9 +322,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
finishedContainers.contains(container));
|
finishedContainers.contains(container));
|
||||||
if (uploadedFilePathsInThisCycle.size() > 0) {
|
if (uploadedFilePathsInThisCycle.size() > 0) {
|
||||||
uploadedLogsInThisCycle = true;
|
uploadedLogsInThisCycle = true;
|
||||||
this.delService.delete(this.userUgi.getShortUserName(), null,
|
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
|
||||||
uploadedFilePathsInThisCycle
|
uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
|
||||||
.toArray(new Path[uploadedFilePathsInThisCycle.size()]));
|
DeletionTask deletionTask = new FileDeletionTask(delService,
|
||||||
|
this.userUgi.getShortUserName(), null,
|
||||||
|
uploadedFilePathsInThisCycleList);
|
||||||
|
delService.delete(deletionTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This container is finished, and all its logs have been uploaded,
|
// This container is finished, and all its logs have been uploaded,
|
||||||
|
@ -352,11 +345,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
|
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.currentTimeMillis();
|
||||||
final Path renamedPath = this.rollingMonitorInterval <= 0
|
final Path renamedPath = getRenamedPath(currentTime);
|
||||||
? remoteNodeLogFileForApp : new Path(
|
|
||||||
remoteNodeLogFileForApp.getParent(),
|
|
||||||
remoteNodeLogFileForApp.getName() + "_"
|
|
||||||
+ currentTime);
|
|
||||||
|
|
||||||
final boolean rename = uploadedLogsInThisCycle;
|
final boolean rename = uploadedLogsInThisCycle;
|
||||||
try {
|
try {
|
||||||
|
@ -396,6 +385,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Path getRenamedPath(long currentTime) {
|
||||||
|
return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp
|
||||||
|
: new Path(remoteNodeLogFileForApp.getParent(),
|
||||||
|
remoteNodeLogFileForApp.getName() + "_" + currentTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addCredentials() {
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
Credentials systemCredentials =
|
||||||
|
context.getSystemCredentialsForApps().get(appId);
|
||||||
|
if (systemCredentials != null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Adding new framework-token for " + appId
|
||||||
|
+ " for log-aggregation: " + systemCredentials.getAllTokens()
|
||||||
|
+ "; userUgi=" + userUgi);
|
||||||
|
}
|
||||||
|
// this will replace old token
|
||||||
|
userUgi.addCredentials(systemCredentials);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected LogWriter createLogWriter() {
|
protected LogWriter createLogWriter() {
|
||||||
return new LogWriter();
|
return new LogWriter();
|
||||||
|
@ -561,8 +572,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (localAppLogDirs.size() > 0) {
|
if (localAppLogDirs.size() > 0) {
|
||||||
this.delService.delete(this.userUgi.getShortUserName(), null,
|
List<Path> localAppLogDirsList = new ArrayList<>();
|
||||||
localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
|
localAppLogDirsList.addAll(localAppLogDirs);
|
||||||
|
DeletionTask deletionTask = new FileDeletionTask(delService,
|
||||||
|
this.userUgi.getShortUserName(), null, localAppLogDirsList);
|
||||||
|
this.delService.delete(deletionTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
|
@ -247,8 +248,10 @@ public class NonAggregatingLogHandler extends AbstractService implements
|
||||||
new ApplicationEvent(this.applicationId,
|
new ApplicationEvent(this.applicationId,
|
||||||
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
|
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
|
||||||
if (localAppLogDirs.size() > 0) {
|
if (localAppLogDirs.size() > 0) {
|
||||||
NonAggregatingLogHandler.this.delService.delete(user, null,
|
FileDeletionTask deletionTask = new FileDeletionTask(
|
||||||
(Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
|
NonAggregatingLogHandler.this.delService, user, null,
|
||||||
|
localAppLogDirs);
|
||||||
|
NonAggregatingLogHandler.this.delService.delete(deletionTask);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
|
NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
|
||||||
|
|
|
@ -40,6 +40,7 @@ message DeletionServiceDeleteTaskProto {
|
||||||
optional int64 deletionTime = 4;
|
optional int64 deletionTime = 4;
|
||||||
repeated string basedirs = 5;
|
repeated string basedirs = 5;
|
||||||
repeated int32 successorIds = 6;
|
repeated int32 successorIds = 6;
|
||||||
|
optional string taskType = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message LocalizedResourceProto {
|
message LocalizedResourceProto {
|
||||||
|
|
|
@ -33,13 +33,14 @@ import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
|
||||||
public class TestDeletionService {
|
public class TestDeletionService {
|
||||||
|
|
||||||
private static final FileContext lfs = getLfs();
|
private static final FileContext lfs = getLfs();
|
||||||
|
@ -123,8 +124,9 @@ public class TestDeletionService {
|
||||||
del.start();
|
del.start();
|
||||||
try {
|
try {
|
||||||
for (Path p : dirs) {
|
for (Path p : dirs) {
|
||||||
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
|
FileDeletionTask deletionTask = new FileDeletionTask(del,
|
||||||
p, null);
|
(Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null);
|
||||||
|
del.delete(deletionTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
int msecToWait = 20 * 1000;
|
int msecToWait = 20 * 1000;
|
||||||
|
@ -159,8 +161,10 @@ public class TestDeletionService {
|
||||||
del.start();
|
del.start();
|
||||||
for (Path p : content) {
|
for (Path p : content) {
|
||||||
assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
|
assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
|
||||||
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
|
FileDeletionTask deletionTask = new FileDeletionTask(del,
|
||||||
p, baseDirs.toArray(new Path[4]));
|
(Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
|
||||||
|
baseDirs);
|
||||||
|
del.delete(deletionTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
int msecToWait = 20 * 1000;
|
int msecToWait = 20 * 1000;
|
||||||
|
@ -196,8 +200,9 @@ public class TestDeletionService {
|
||||||
del.init(conf);
|
del.init(conf);
|
||||||
del.start();
|
del.start();
|
||||||
for (Path p : dirs) {
|
for (Path p : dirs) {
|
||||||
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
|
FileDeletionTask deletionTask = new FileDeletionTask(del,
|
||||||
null);
|
(Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null);
|
||||||
|
del.delete(deletionTask);
|
||||||
}
|
}
|
||||||
int msecToWait = 20 * 1000;
|
int msecToWait = 20 * 1000;
|
||||||
for (Path p : dirs) {
|
for (Path p : dirs) {
|
||||||
|
@ -220,7 +225,9 @@ public class TestDeletionService {
|
||||||
try {
|
try {
|
||||||
del.init(conf);
|
del.init(conf);
|
||||||
del.start();
|
del.start();
|
||||||
del.delete("dingo", new Path("/does/not/exist"));
|
FileDeletionTask deletionTask = new FileDeletionTask(del, "dingo",
|
||||||
|
new Path("/does/not/exist"), null);
|
||||||
|
del.delete(deletionTask);
|
||||||
} finally {
|
} finally {
|
||||||
del.stop();
|
del.stop();
|
||||||
}
|
}
|
||||||
|
@ -247,18 +254,20 @@ public class TestDeletionService {
|
||||||
// first we will try to delete sub directories which are present. This
|
// first we will try to delete sub directories which are present. This
|
||||||
// should then trigger parent directory to be deleted.
|
// should then trigger parent directory to be deleted.
|
||||||
List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
|
List<Path> subDirs = buildDirs(r, dirs.get(0), 2);
|
||||||
|
|
||||||
FileDeletionTask dependentDeletionTask =
|
FileDeletionTask dependentDeletionTask =
|
||||||
del.createFileDeletionTask(null, dirs.get(0), new Path[] {});
|
new FileDeletionTask(del, null, dirs.get(0), new ArrayList<Path>());
|
||||||
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
||||||
for (Path subDir : subDirs) {
|
for (Path subDir : subDirs) {
|
||||||
|
List<Path> subDirList = new ArrayList<>();
|
||||||
|
subDirList.add(subDir);
|
||||||
FileDeletionTask deletionTask =
|
FileDeletionTask deletionTask =
|
||||||
del.createFileDeletionTask(null, null, new Path[] { subDir });
|
new FileDeletionTask(del, null, dirs.get(0), subDirList);
|
||||||
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
|
deletionTask.addDeletionTaskDependency(dependentDeletionTask);
|
||||||
deletionTasks.add(deletionTask);
|
deletionTasks.add(deletionTask);
|
||||||
}
|
}
|
||||||
for (FileDeletionTask task : deletionTasks) {
|
for (FileDeletionTask task : deletionTasks) {
|
||||||
del.scheduleFileDeletionTask(task);
|
del.delete(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
int msecToWait = 20 * 1000;
|
int msecToWait = 20 * 1000;
|
||||||
|
@ -274,19 +283,21 @@ public class TestDeletionService {
|
||||||
subDirs = buildDirs(r, dirs.get(1), 2);
|
subDirs = buildDirs(r, dirs.get(1), 2);
|
||||||
subDirs.add(new Path(dirs.get(1), "absentFile"));
|
subDirs.add(new Path(dirs.get(1), "absentFile"));
|
||||||
|
|
||||||
dependentDeletionTask =
|
dependentDeletionTask = new FileDeletionTask(del, null, dirs.get(1),
|
||||||
del.createFileDeletionTask(null, dirs.get(1), new Path[] {});
|
new ArrayList<Path>());
|
||||||
deletionTasks = new ArrayList<FileDeletionTask>();
|
deletionTasks = new ArrayList<FileDeletionTask>();
|
||||||
for (Path subDir : subDirs) {
|
for (Path subDir : subDirs) {
|
||||||
FileDeletionTask deletionTask =
|
List<Path> subDirList = new ArrayList<>();
|
||||||
del.createFileDeletionTask(null, null, new Path[] { subDir });
|
subDirList.add(subDir);
|
||||||
deletionTask.addFileDeletionTaskDependency(dependentDeletionTask);
|
FileDeletionTask deletionTask = new FileDeletionTask(del, null, null,
|
||||||
|
subDirList);
|
||||||
|
deletionTask.addDeletionTaskDependency(dependentDeletionTask);
|
||||||
deletionTasks.add(deletionTask);
|
deletionTasks.add(deletionTask);
|
||||||
}
|
}
|
||||||
// marking one of the tasks as a failure.
|
// marking one of the tasks as a failure.
|
||||||
deletionTasks.get(2).setSuccess(false);
|
deletionTasks.get(2).setSuccess(false);
|
||||||
for (FileDeletionTask task : deletionTasks) {
|
for (FileDeletionTask task : deletionTasks) {
|
||||||
del.scheduleFileDeletionTask(task);
|
del.delete(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
msecToWait = 20 * 1000;
|
msecToWait = 20 * 1000;
|
||||||
|
@ -327,8 +338,10 @@ public class TestDeletionService {
|
||||||
del.start();
|
del.start();
|
||||||
for (Path p : content) {
|
for (Path p : content) {
|
||||||
assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
|
assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
|
||||||
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
|
FileDeletionTask deletionTask = new FileDeletionTask(del,
|
||||||
p, baseDirs.toArray(new Path[4]));
|
(Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
|
||||||
|
baseDirs);
|
||||||
|
del.delete(deletionTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
// restart the deletion service
|
// restart the deletion service
|
||||||
|
@ -341,8 +354,10 @@ public class TestDeletionService {
|
||||||
// verify paths are still eventually deleted
|
// verify paths are still eventually deleted
|
||||||
int msecToWait = 10 * 1000;
|
int msecToWait = 10 * 1000;
|
||||||
for (Path p : baseDirs) {
|
for (Path p : baseDirs) {
|
||||||
|
System.out.println("TEST Basedir: " + p.getName());
|
||||||
for (Path q : content) {
|
for (Path q : content) {
|
||||||
Path fp = new Path(p, q);
|
Path fp = new Path(p, q);
|
||||||
|
System.out.println("TEST Path: " + fp.toString());
|
||||||
while (msecToWait > 0 && lfs.util().exists(fp)) {
|
while (msecToWait > 0 && lfs.util().exists(fp)) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
msecToWait -= 100;
|
msecToWait -= 100;
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.isNull;
|
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -28,6 +27,7 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -58,19 +58,17 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentMatcher;
|
|
||||||
|
|
||||||
public class TestNodeManagerReboot {
|
public class TestNodeManagerReboot {
|
||||||
|
|
||||||
|
@ -195,19 +193,18 @@ public class TestNodeManagerReboot {
|
||||||
// restart the NodeManager
|
// restart the NodeManager
|
||||||
restartNM(MAX_TRIES);
|
restartNM(MAX_TRIES);
|
||||||
checkNumOfLocalDirs();
|
checkNumOfLocalDirs();
|
||||||
|
|
||||||
verify(delService, times(1)).delete(
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
(String) isNull(),
|
delService, null,
|
||||||
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
|
new Path(ResourceLocalizationService.NM_PRIVATE_DIR + "_DEL_"), null)));
|
||||||
+ "_DEL_")));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
verify(delService, times(1)).delete((String) isNull(),
|
delService, null, new Path(ContainerLocalizer.FILECACHE + "_DEL_"),
|
||||||
argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
|
null)));
|
||||||
verify(delService, times(1)).scheduleFileDeletionTask(
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
argThat(new FileDeletionInclude(user, null,
|
delService, user, null, Arrays.asList(new Path(destinationFile)))));
|
||||||
new String[] { destinationFile })));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
verify(delService, times(1)).scheduleFileDeletionTask(
|
delService, null, new Path(ContainerLocalizer.USERCACHE + "_DEL_"),
|
||||||
argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
|
new ArrayList<Path>())));
|
||||||
+ "_DEL_", new String[] {})));
|
|
||||||
|
|
||||||
// restart the NodeManager again
|
// restart the NodeManager again
|
||||||
// this time usercache directory should be empty
|
// this time usercache directory should be empty
|
||||||
|
@ -329,72 +326,4 @@ public class TestNodeManagerReboot {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class PathInclude extends ArgumentMatcher<Path> {
|
|
||||||
|
|
||||||
final String part;
|
|
||||||
|
|
||||||
PathInclude(String part) {
|
|
||||||
this.part = part;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean matches(Object o) {
|
|
||||||
return ((Path) o).getName().indexOf(part) != -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class FileDeletionInclude extends ArgumentMatcher<FileDeletionTask> {
|
|
||||||
final String user;
|
|
||||||
final String subDirIncludes;
|
|
||||||
final String[] baseDirIncludes;
|
|
||||||
|
|
||||||
public FileDeletionInclude(String user, String subDirIncludes,
|
|
||||||
String [] baseDirIncludes) {
|
|
||||||
this.user = user;
|
|
||||||
this.subDirIncludes = subDirIncludes;
|
|
||||||
this.baseDirIncludes = baseDirIncludes;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean matches(Object o) {
|
|
||||||
FileDeletionTask fd = (FileDeletionTask)o;
|
|
||||||
if (fd.getUser() == null && user != null) {
|
|
||||||
return false;
|
|
||||||
} else if (fd.getUser() != null && user == null) {
|
|
||||||
return false;
|
|
||||||
} else if (fd.getUser() != null && user != null) {
|
|
||||||
return fd.getUser().equals(user);
|
|
||||||
}
|
|
||||||
if (!comparePaths(fd.getSubDir(), subDirIncludes)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (baseDirIncludes == null && fd.getBaseDirs() != null) {
|
|
||||||
return false;
|
|
||||||
} else if (baseDirIncludes != null && fd.getBaseDirs() == null ) {
|
|
||||||
return false;
|
|
||||||
} else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
|
|
||||||
if (baseDirIncludes.length != fd.getBaseDirs().size()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (int i =0 ; i < baseDirIncludes.length; i++) {
|
|
||||||
if (!comparePaths(fd.getBaseDirs().get(i), baseDirIncludes[i])) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean comparePaths(Path p1, String p2) {
|
|
||||||
if (p1 == null && p2 != null){
|
|
||||||
return false;
|
|
||||||
} else if (p1 != null && p2 == null) {
|
|
||||||
return false;
|
|
||||||
} else if (p1 != null && p2 != null ){
|
|
||||||
return p1.toUri().getPath().contains(p2.toString());
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.recovery.DeletionTaskRecoveryInfo;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTaskType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test conversion to {@link DeletionTask}.
|
||||||
|
*/
|
||||||
|
public class TestNMProtoUtils {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertProtoToDeletionTask() throws Exception {
|
||||||
|
DeletionService deletionService = mock(DeletionService.class);
|
||||||
|
DeletionServiceDeleteTaskProto.Builder protoBuilder =
|
||||||
|
DeletionServiceDeleteTaskProto.newBuilder();
|
||||||
|
int id = 0;
|
||||||
|
protoBuilder.setId(id);
|
||||||
|
DeletionServiceDeleteTaskProto proto = protoBuilder.build();
|
||||||
|
DeletionTask deletionTask =
|
||||||
|
NMProtoUtils.convertProtoToDeletionTask(proto, deletionService);
|
||||||
|
assertEquals(DeletionTaskType.FILE, deletionTask.getDeletionTaskType());
|
||||||
|
assertEquals(id, deletionTask.getTaskId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertProtoToFileDeletionTask() throws Exception {
|
||||||
|
DeletionService deletionService = mock(DeletionService.class);
|
||||||
|
int id = 0;
|
||||||
|
String user = "user";
|
||||||
|
Path subdir = new Path("subdir");
|
||||||
|
Path basedir = new Path("basedir");
|
||||||
|
DeletionServiceDeleteTaskProto.Builder protoBuilder =
|
||||||
|
DeletionServiceDeleteTaskProto.newBuilder();
|
||||||
|
protoBuilder
|
||||||
|
.setId(id)
|
||||||
|
.setUser("user")
|
||||||
|
.setSubdir(subdir.getName())
|
||||||
|
.addBasedirs(basedir.getName());
|
||||||
|
DeletionServiceDeleteTaskProto proto = protoBuilder.build();
|
||||||
|
DeletionTask deletionTask =
|
||||||
|
NMProtoUtils.convertProtoToFileDeletionTask(proto, deletionService, id);
|
||||||
|
assertEquals(DeletionTaskType.FILE.name(),
|
||||||
|
deletionTask.getDeletionTaskType().name());
|
||||||
|
assertEquals(id, deletionTask.getTaskId());
|
||||||
|
assertEquals(subdir, ((FileDeletionTask) deletionTask).getSubDir());
|
||||||
|
assertEquals(basedir,
|
||||||
|
((FileDeletionTask) deletionTask).getBaseDirs().get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertProtoToDeletionTaskRecoveryInfo() throws Exception {
|
||||||
|
long delTime = System.currentTimeMillis();
|
||||||
|
List<Integer> successorTaskIds = Arrays.asList(1);
|
||||||
|
DeletionTask deletionTask = mock(DeletionTask.class);
|
||||||
|
DeletionTaskRecoveryInfo info =
|
||||||
|
new DeletionTaskRecoveryInfo(deletionTask, successorTaskIds, delTime);
|
||||||
|
assertEquals(deletionTask, info.getTask());
|
||||||
|
assertEquals(successorTaskIds, info.getSuccessorTaskIds());
|
||||||
|
assertEquals(delTime, info.getDeletionTimestamp());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
|
@ -260,10 +261,10 @@ public abstract class BaseContainerManagerTest {
|
||||||
protected DeletionService createDeletionService() {
|
protected DeletionService createDeletionService() {
|
||||||
return new DeletionService(exec) {
|
return new DeletionService(exec) {
|
||||||
@Override
|
@Override
|
||||||
public void delete(String user, Path subDir, Path... baseDirs) {
|
public void delete(DeletionTask deletionTask) {
|
||||||
// Don't do any deletions.
|
// Don't do any deletions.
|
||||||
LOG.info("Psuedo delete: user - " + user + ", subDir - " + subDir
|
LOG.info("Psuedo delete: user - " + user
|
||||||
+ ", baseDirs - " + Arrays.asList(baseDirs));
|
+ ", type - " + deletionTask.getDeletionTaskType());
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.mockito.ArgumentMatcher;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ArgumentMatcher to check the arguments of the {@link FileDeletionTask}.
|
||||||
|
*/
|
||||||
|
public class FileDeletionMatcher extends ArgumentMatcher<FileDeletionTask> {
|
||||||
|
|
||||||
|
private final DeletionService delService;
|
||||||
|
private final String user;
|
||||||
|
private final Path subDirIncludes;
|
||||||
|
private final List<Path> baseDirIncludes;
|
||||||
|
|
||||||
|
public FileDeletionMatcher(DeletionService delService, String user,
|
||||||
|
Path subDirIncludes, List<Path> baseDirIncludes) {
|
||||||
|
this.delService = delService;
|
||||||
|
this.user = user;
|
||||||
|
this.subDirIncludes = subDirIncludes;
|
||||||
|
this.baseDirIncludes = baseDirIncludes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean matches(Object o) {
|
||||||
|
FileDeletionTask fd = (FileDeletionTask) o;
|
||||||
|
if (fd.getUser() == null && user != null) {
|
||||||
|
return false;
|
||||||
|
} else if (fd.getUser() != null && user == null) {
|
||||||
|
return false;
|
||||||
|
} else if (fd.getUser() != null && user != null) {
|
||||||
|
return fd.getUser().equals(user);
|
||||||
|
}
|
||||||
|
if (!comparePaths(fd.getSubDir(), subDirIncludes.getName())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (baseDirIncludes == null && fd.getBaseDirs() != null) {
|
||||||
|
return false;
|
||||||
|
} else if (baseDirIncludes != null && fd.getBaseDirs() == null) {
|
||||||
|
return false;
|
||||||
|
} else if (baseDirIncludes != null && fd.getBaseDirs() != null) {
|
||||||
|
if (baseDirIncludes.size() != fd.getBaseDirs().size()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < baseDirIncludes.size(); i++) {
|
||||||
|
if (!comparePaths(fd.getBaseDirs().get(i),
|
||||||
|
baseDirIncludes.get(i).getName())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean comparePaths(Path p1, String p2) {
|
||||||
|
if (p1 == null && p2 != null) {
|
||||||
|
return false;
|
||||||
|
} else if (p1 != null && p2 == null) {
|
||||||
|
return false;
|
||||||
|
} else if (p1 != null && p2 != null) {
|
||||||
|
return p1.toUri().getPath().contains(p2.toString());
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,85 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the attributes of the {@link FileDeletionTask} class.
|
||||||
|
*/
|
||||||
|
public class TestFileDeletionTask {
|
||||||
|
|
||||||
|
private static final int ID = 0;
|
||||||
|
private static final String USER = "user";
|
||||||
|
private static final Path SUBDIR = new Path("subdir");
|
||||||
|
private static final Path BASEDIR = new Path("basedir");
|
||||||
|
|
||||||
|
private List<Path> baseDirs = new ArrayList<>();
|
||||||
|
private DeletionService deletionService;
|
||||||
|
private FileDeletionTask deletionTask;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
deletionService = mock(DeletionService.class);
|
||||||
|
baseDirs.add(BASEDIR);
|
||||||
|
deletionTask = new FileDeletionTask(ID, deletionService, USER, SUBDIR,
|
||||||
|
baseDirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
baseDirs.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetUser() throws Exception {
|
||||||
|
assertEquals(USER, deletionTask.getUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSubDir() throws Exception {
|
||||||
|
assertEquals(SUBDIR, deletionTask.getSubDir());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetBaseDirs() throws Exception {
|
||||||
|
assertEquals(1, deletionTask.getBaseDirs().size());
|
||||||
|
assertEquals(baseDirs, deletionTask.getBaseDirs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertDeletionTaskToProto() throws Exception {
|
||||||
|
DeletionServiceDeleteTaskProto proto =
|
||||||
|
deletionTask.convertDeletionTaskToProto();
|
||||||
|
assertEquals(ID, proto.getId());
|
||||||
|
assertEquals(USER, proto.getUser());
|
||||||
|
assertEquals(SUBDIR, new Path(proto.getSubdir()));
|
||||||
|
assertEquals(BASEDIR, new Path(proto.getBasedirs(0)));
|
||||||
|
assertEquals(1, proto.getBasedirsCount());
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
|
||||||
|
@ -823,7 +825,8 @@ public class TestLocalResourcesTrackerImpl {
|
||||||
Path rPath = tracker.getPathForLocalization(req1, base_path,
|
Path rPath = tracker.getPathForLocalization(req1, base_path,
|
||||||
delService);
|
delService);
|
||||||
Assert.assertFalse(lfs.util().exists(rPath));
|
Assert.assertFalse(lfs.util().exists(rPath));
|
||||||
verify(delService, times(1)).delete(eq(user), eq(conflictPath));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delService, user, conflictPath, null)));
|
||||||
} finally {
|
} finally {
|
||||||
lfs.delete(base_path, true);
|
lfs.delete(base_path, true);
|
||||||
if (dispatcher != null) {
|
if (dispatcher != null) {
|
||||||
|
|
|
@ -31,7 +31,6 @@ import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Matchers.isNull;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -68,6 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Options;
|
import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
@ -1067,7 +1067,8 @@ public class TestResourceLocalizationService {
|
||||||
verify(containerBus, times(3)).handle(argThat(matchesContainerLoc));
|
verify(containerBus, times(3)).handle(argThat(matchesContainerLoc));
|
||||||
|
|
||||||
// Verify deletion of localization token.
|
// Verify deletion of localization token.
|
||||||
verify(delService).delete((String)isNull(), eq(localizationTokenPath));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delService, null, localizationTokenPath, null)));
|
||||||
} finally {
|
} finally {
|
||||||
spyService.stop();
|
spyService.stop();
|
||||||
dispatcher.stop();
|
dispatcher.stop();
|
||||||
|
@ -1341,8 +1342,8 @@ public class TestResourceLocalizationService {
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
// Verify if downloading resources were submitted for deletion.
|
// Verify if downloading resources were submitted for deletion.
|
||||||
verify(delService).delete(eq(user), (Path) eq(null),
|
verify(delService, times(2)).delete(argThat(new FileDeletionMatcher(
|
||||||
argThat(new DownloadingPathsMatcher(paths)));
|
delService, user, null, new ArrayList<>(paths))));
|
||||||
|
|
||||||
LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(
|
LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(
|
||||||
LocalResourceVisibility.PRIVATE, "user0", appId);
|
LocalResourceVisibility.PRIVATE, "user0", appId);
|
||||||
|
@ -2755,15 +2756,19 @@ public class TestResourceLocalizationService {
|
||||||
for (int i = 0; i < containerLocalDirs.size(); ++i) {
|
for (int i = 0; i < containerLocalDirs.size(); ++i) {
|
||||||
if (i == 2) {
|
if (i == 2) {
|
||||||
try {
|
try {
|
||||||
verify(delService).delete(user, containerLocalDirs.get(i));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
verify(delService).delete(null, nmLocalContainerDirs.get(i));
|
delService, user, containerLocalDirs.get(i), null)));
|
||||||
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delService, null, nmLocalContainerDirs.get(i), null)));
|
||||||
Assert.fail("deletion attempts for invalid dirs");
|
Assert.fail("deletion attempts for invalid dirs");
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
verify(delService).delete(user, containerLocalDirs.get(i));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
verify(delService).delete(null, nmLocalContainerDirs.get(i));
|
delService, user, containerLocalDirs.get(i), null)));
|
||||||
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delService, null, nmLocalContainerDirs.get(i), null)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2804,15 +2809,19 @@ public class TestResourceLocalizationService {
|
||||||
for (int i = 0; i < containerLocalDirs.size(); ++i) {
|
for (int i = 0; i < containerLocalDirs.size(); ++i) {
|
||||||
if (i == 3) {
|
if (i == 3) {
|
||||||
try {
|
try {
|
||||||
verify(delService).delete(user, containerLocalDirs.get(i));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
verify(delService).delete(null, nmLocalContainerDirs.get(i));
|
delService, user, containerLocalDirs.get(i), null)));
|
||||||
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delService, null, nmLocalContainerDirs.get(i), null)));
|
||||||
Assert.fail("deletion attempts for invalid dirs");
|
Assert.fail("deletion attempts for invalid dirs");
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
verify(delService).delete(user, appLocalDirs.get(i));
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
verify(delService).delete(null, nmLocalAppDirs.get(i));
|
delService, user, containerLocalDirs.get(i), null)));
|
||||||
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delService, null, nmLocalContainerDirs.get(i), null)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,17 +42,16 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Matchers;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -311,16 +310,18 @@ public class TestAppLogAggregatorImpl {
|
||||||
@Override
|
@Override
|
||||||
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
Set<String> paths = new HashSet<>();
|
Set<String> paths = new HashSet<>();
|
||||||
Object[] args = invocationOnMock.getArguments();
|
Object[] tasks = invocationOnMock.getArguments();
|
||||||
for(int i = 2; i < args.length; i++) {
|
for(int i = 0; i < tasks.length; i++) {
|
||||||
Path path = (Path) args[i];
|
FileDeletionTask task = (FileDeletionTask) tasks[i];
|
||||||
paths.add(path.toUri().getRawPath());
|
for (Path path: task.getBaseDirs()) {
|
||||||
|
paths.add(path.toUri().getRawPath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
verifyFilesToDelete(expectedPathsForDeletion, paths);
|
verifyFilesToDelete(expectedPathsForDeletion, paths);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}).doNothing().when(deletionServiceWithExpectedFiles).delete(
|
}).doNothing().when(deletionServiceWithExpectedFiles).delete(
|
||||||
any(String.class), any(Path.class), Matchers.<Path>anyVararg());
|
any(FileDeletionTask.class));
|
||||||
|
|
||||||
return deletionServiceWithExpectedFiles;
|
return deletionServiceWithExpectedFiles;
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
|
@ -218,8 +220,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
// ensure filesystems were closed
|
// ensure filesystems were closed
|
||||||
verify(logAggregationService).closeFileSystems(
|
verify(logAggregationService).closeFileSystems(
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
verify(delSrvc).delete(eq(user), eq((Path) null),
|
List<Path> dirList = new ArrayList<>();
|
||||||
eq(new Path(app1LogDir.getAbsolutePath())));
|
dirList.add(new Path(app1LogDir.toURI()));
|
||||||
|
verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
|
||||||
|
delSrvc, user, null, dirList)));
|
||||||
|
|
||||||
String containerIdStr = container11.toString();
|
String containerIdStr = container11.toString();
|
||||||
File containerLogDir = new File(app1LogDir, containerIdStr);
|
File containerLogDir = new File(app1LogDir, containerIdStr);
|
||||||
|
@ -333,7 +337,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
logAggregationService.stop();
|
logAggregationService.stop();
|
||||||
delSrvc.stop();
|
delSrvc.stop();
|
||||||
// Aggregated logs should not be deleted if not uploaded.
|
// Aggregated logs should not be deleted if not uploaded.
|
||||||
verify(delSrvc, times(0)).delete(user, null);
|
FileDeletionTask deletionTask = new FileDeletionTask(delSrvc, user, null,
|
||||||
|
null);
|
||||||
|
verify(delSrvc, times(0)).delete(deletionTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -815,8 +821,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||||
assertEquals(0, logAggregationService.getNumAggregators());
|
assertEquals(0, logAggregationService.getNumAggregators());
|
||||||
// local log dir shouldn't be deleted given log aggregation cannot
|
// local log dir shouldn't be deleted given log aggregation cannot
|
||||||
// continue due to aggregated log dir creation failure on remoteFS.
|
// continue due to aggregated log dir creation failure on remoteFS.
|
||||||
verify(spyDelSrvc, never()).delete(eq(user), any(Path.class),
|
FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user,
|
||||||
Mockito.<Path>anyVararg());
|
null, null);
|
||||||
|
verify(spyDelSrvc, never()).delete(deletionTask);
|
||||||
verify(logAggregationService).closeFileSystems(
|
verify(logAggregationService).closeFileSystems(
|
||||||
any(UserGroupInformation.class));
|
any(UserGroupInformation.class));
|
||||||
// make sure local log dir is not deleted in case log aggregation
|
// make sure local log dir is not deleted in case log aggregation
|
||||||
|
|
|
@ -22,12 +22,14 @@ import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -36,6 +38,7 @@ import java.io.IOException;
|
||||||
import java.io.NotSerializableException;
|
import java.io.NotSerializableException;
|
||||||
import java.io.ObjectInputStream;
|
import java.io.ObjectInputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -66,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
||||||
|
@ -531,8 +535,8 @@ public class TestNonAggregatingLogHandler {
|
||||||
boolean matched = false;
|
boolean matched = false;
|
||||||
while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) {
|
while (!matched && System.currentTimeMillis() < verifyStartTime + timeout) {
|
||||||
try {
|
try {
|
||||||
verify(delService).delete(eq(user), (Path) eq(null),
|
verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
|
||||||
Mockito.argThat(new DeletePathsMatcher(matchPaths)));
|
delService, user, null, Arrays.asList(matchPaths))));
|
||||||
matched = true;
|
matched = true;
|
||||||
} catch (WantedButNotInvoked e) {
|
} catch (WantedButNotInvoked e) {
|
||||||
notInvokedException = e;
|
notInvokedException = e;
|
||||||
|
|
Loading…
Reference in New Issue