From 072360d1286cf1de3f83664755144b4d468dae6e Mon Sep 17 00:00:00 2001 From: Junping Du Date: Tue, 17 Jun 2014 01:02:16 +0000 Subject: [PATCH] YARN-1339. Recover DeletionService state upon nodemanager restart. (Contributed by Jason Lowe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603036 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../server/nodemanager/DeletionService.java | 180 +++++++++++++++++- .../yarn/server/nodemanager/NodeManager.java | 2 +- .../recovery/NMLeveldbStateStoreService.java | 54 ++++++ .../recovery/NMNullStateStoreService.java | 17 ++ .../recovery/NMStateStoreService.java | 18 ++ .../yarn_server_nodemanager_recovery.proto | 9 + .../nodemanager/TestDeletionService.java | 55 ++++++ .../recovery/NMMemoryStateStoreService.java | 27 +++ .../TestNMLeveldbStateStoreService.java | 56 ++++++ 10 files changed, 411 insertions(+), 10 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 32b914d948c..ab5ae5ac9c5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -39,6 +39,9 @@ Release 2.5.0 - UNRELEASED YARN-1702. Added kill app functionality to RM web services. (Varun Vasudev via vinodkv) + YARN-1339. Recover DeletionService state upon nodemanager restart. (Jason Lowe + via junping_du) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 45504fdcdd7..e4025f5da1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager; import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; @@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -50,6 +57,8 @@ public class DeletionService extends AbstractService { private final ContainerExecutor exec; private ScheduledThreadPoolExecutor sched; private static final FileContext lfs = getLfs(); + private final NMStateStoreService stateStore; + private AtomicInteger nextTaskId = new AtomicInteger(0); static final FileContext getLfs() { try { @@ -60,13 +69,17 @@ public class DeletionService extends AbstractService { } public DeletionService(ContainerExecutor exec) { + this(exec, new NMNullStateStoreService()); + } + + public DeletionService(ContainerExecutor exec, + NMStateStoreService stateStore) { super(DeletionService.class.getName()); this.exec = exec; this.debugDelay = 0; + this.stateStore = stateStore; } - /** - * /** * Delete the path(s) as this user. * @param user The user to delete as, or the JVM user if null @@ -76,19 +89,20 @@ public class DeletionService extends AbstractService { public void delete(String user, Path subDir, Path... baseDirs) { // TODO if parent owned by NM, rename within parent inline if (debugDelay != -1) { - if (baseDirs == null || baseDirs.length == 0) { - sched.schedule(new FileDeletionTask(this, user, subDir, null), - debugDelay, TimeUnit.SECONDS); - } else { - sched.schedule( - new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)), - debugDelay, TimeUnit.SECONDS); + List baseDirList = null; + if (baseDirs != null && baseDirs.length != 0) { + baseDirList = Arrays.asList(baseDirs); } + FileDeletionTask task = + new FileDeletionTask(this, user, subDir, baseDirList); + recordDeletionTaskInStateStore(task); + sched.schedule(task, debugDelay, TimeUnit.SECONDS); } } public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) { if (debugDelay != -1) { + recordDeletionTaskInStateStore(fileDeletionTask); sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS); } } @@ -109,6 +123,9 @@ public class DeletionService extends AbstractService { } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); + if (stateStore.canRecover()) { + recover(stateStore.loadDeletionServiceState()); + } super.serviceInit(conf); } @@ -139,6 +156,8 @@ public class DeletionService extends AbstractService { } public static class FileDeletionTask implements Runnable { + public static final int INVALID_TASK_ID = -1; + private int taskId; private final String user; private final Path subDir; private final List baseDirs; @@ -152,6 +171,12 @@ public class DeletionService extends AbstractService { private FileDeletionTask(DeletionService delService, String user, Path subDir, List baseDirs) { + this(INVALID_TASK_ID, delService, user, subDir, baseDirs); + } + + private FileDeletionTask(int taskId, DeletionService delService, + String user, Path subDir, List baseDirs) { + this.taskId = taskId; this.delService = delService; this.user = user; this.subDir = subDir; @@ -198,6 +223,12 @@ public class DeletionService extends AbstractService { return this.success; } + public synchronized FileDeletionTask[] getSuccessorTasks() { + FileDeletionTask[] successors = + new FileDeletionTask[successorTaskSet.size()]; + return successorTaskSet.toArray(successors); + } + @Override public void run() { if (LOG.isDebugEnabled()) { @@ -286,6 +317,12 @@ public class DeletionService extends AbstractService { * dependent tasks of it has failed marking its success = false. */ private synchronized void fileDeletionTaskFinished() { + try { + delService.stateStore.removeDeletionTask(taskId); + } catch (IOException e) { + LOG.error("Unable to remove deletion task " + taskId + + " from state store", e); + } Iterator successorTaskI = this.successorTaskSet.iterator(); while (successorTaskI.hasNext()) { @@ -318,4 +355,129 @@ public class DeletionService extends AbstractService { Path[] baseDirs) { return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)); } + + private void recover(RecoveredDeletionServiceState state) + throws IOException { + List taskProtos = state.getTasks(); + Map idToInfoMap = + new HashMap(taskProtos.size()); + Set successorTasks = new HashSet(); + for (DeletionServiceDeleteTaskProto proto : taskProtos) { + DeletionTaskRecoveryInfo info = parseTaskProto(proto); + idToInfoMap.put(info.task.taskId, info); + nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId)); + successorTasks.addAll(info.successorTaskIds); + } + + // restore the task dependencies and schedule the deletion tasks that + // have no predecessors + final long now = System.currentTimeMillis(); + for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) { + for (Integer successorId : info.successorTaskIds){ + DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId); + if (successor != null) { + info.task.addFileDeletionTaskDependency(successor.task); + } else { + LOG.error("Unable to locate dependency task for deletion task " + + info.task.taskId + " at " + info.task.getSubDir()); + } + } + if (!successorTasks.contains(info.task.taskId)) { + long msecTilDeletion = info.deletionTimestamp - now; + sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS); + } + } + } + + private DeletionTaskRecoveryInfo parseTaskProto( + DeletionServiceDeleteTaskProto proto) throws IOException { + int taskId = proto.getId(); + String user = proto.hasUser() ? proto.getUser() : null; + Path subdir = null; + List basePaths = null; + if (proto.hasSubdir()) { + subdir = new Path(proto.getSubdir()); + } + List basedirs = proto.getBasedirsList(); + if (basedirs != null && basedirs.size() > 0) { + basePaths = new ArrayList(basedirs.size()); + for (String basedir : basedirs) { + basePaths.add(new Path(basedir)); + } + } + + FileDeletionTask task = new FileDeletionTask(taskId, this, user, + subdir, basePaths); + return new DeletionTaskRecoveryInfo(task, + proto.getSuccessorIdsList(), + proto.getDeletionTime()); + } + + private int generateTaskId() { + // get the next ID but avoid an invalid ID + int taskId = nextTaskId.incrementAndGet(); + while (taskId == FileDeletionTask.INVALID_TASK_ID) { + taskId = nextTaskId.incrementAndGet(); + } + return taskId; + } + + private void recordDeletionTaskInStateStore(FileDeletionTask task) { + if (!stateStore.canRecover()) { + // optimize the case where we aren't really recording + return; + } + if (task.taskId != FileDeletionTask.INVALID_TASK_ID) { + return; // task already recorded + } + + task.taskId = generateTaskId(); + + FileDeletionTask[] successors = task.getSuccessorTasks(); + + // store successors first to ensure task IDs have been generated for them + for (FileDeletionTask successor : successors) { + recordDeletionTaskInStateStore(successor); + } + + DeletionServiceDeleteTaskProto.Builder builder = + DeletionServiceDeleteTaskProto.newBuilder(); + builder.setId(task.taskId); + if (task.getUser() != null) { + builder.setUser(task.getUser()); + } + if (task.getSubDir() != null) { + builder.setSubdir(task.getSubDir().toString()); + } + builder.setDeletionTime(System.currentTimeMillis() + + TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS)); + if (task.getBaseDirs() != null) { + for (Path dir : task.getBaseDirs()) { + builder.addBasedirs(dir.toString()); + } + } + for (FileDeletionTask successor : successors) { + builder.addSuccessorIds(successor.taskId); + } + + try { + stateStore.storeDeletionTask(task.taskId, builder.build()); + } catch (IOException e) { + LOG.error("Unable to store deletion task " + task.taskId + " for " + + task.getSubDir(), e); + } + } + + private static class DeletionTaskRecoveryInfo { + FileDeletionTask task; + List successorTaskIds; + long deletionTimestamp; + + public DeletionTaskRecoveryInfo(FileDeletionTask task, + List successorTaskIds, long deletionTimestamp) { + this.task = task; + this.successorTaskIds = successorTaskIds; + this.deletionTimestamp = deletionTimestamp; + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 83b0ede5b1a..2292a0dc9de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -114,7 +114,7 @@ public class NodeManager extends CompositeService } protected DeletionService createDeletionService(ContainerExecutor exec) { - return new DeletionService(exec); + return new DeletionService(exec, nmStore); } protected NMContext createNMContext( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index d124757ca6f..02a55694d5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -58,6 +59,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String DB_SCHEMA_VERSION_KEY = "schema-version"; private static final String DB_SCHEMA_VERSION = "1.0"; + private static final String DELETION_TASK_KEY_PREFIX = + "DeletionService/deltask_"; + private static final String LOCALIZATION_KEY_PREFIX = "Localization/"; private static final String LOCALIZATION_PUBLIC_KEY_PREFIX = LOCALIZATION_KEY_PREFIX + "public/"; @@ -308,6 +312,56 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } + @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); + state.tasks = new ArrayList(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(DELETION_TASK_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { + break; + } + state.tasks.add( + DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); + } + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } finally { + if (iter != null) { + iter.close(); + } + } + return state; + } + + @Override + public void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException { + String key = DELETION_TASK_KEY_PREFIX + taskId; + try { + db.put(bytes(key), taskProto.toByteArray()); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public void removeDeletionTask(int taskId) throws IOException { + String key = DELETION_TASK_KEY_PREFIX + taskId; + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override protected void initStorage(Configuration conf) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index d41ddde6291..dfe4f096bf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; // The state store to use when state isn't being stored @@ -60,6 +61,22 @@ public class NMNullStateStoreService extends NMStateStoreService { Path localPath) throws IOException { } + @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException { + } + + @Override + public void removeDeletionTask(int taskId) throws IOException { + } + @Override protected void initStorage(Configuration conf) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 295fdb97b84..f2e594528be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; @Private @@ -91,6 +92,14 @@ public abstract class NMStateStoreService extends AbstractService { } } + public static class RecoveredDeletionServiceState { + List tasks; + + public List getTasks() { + return tasks; + } + } + /** Initialize the state storage */ @Override public void serviceInit(Configuration conf) throws IOException { @@ -155,6 +164,15 @@ public abstract class NMStateStoreService extends AbstractService { ApplicationId appId, Path localPath) throws IOException; + public abstract RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException; + + public abstract void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException; + + public abstract void removeDeletionTask(int taskId) throws IOException; + + protected abstract void initStorage(Configuration conf) throws IOException; protected abstract void startStorage() throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index bd1f74a965f..9546dbbe70d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -24,6 +24,15 @@ package hadoop.yarn; import "yarn_protos.proto"; +message DeletionServiceDeleteTaskProto { + optional int32 id = 1; + optional string user = 2; + optional string subdir = 3; + optional int64 deletionTime = 4; + repeated string basedirs = 5; + repeated int32 successorIds = 6; +} + message LocalizedResourceProto { optional LocalResourceProto resource = 1; optional string localPath = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index 69208c506ea..c01ea15146f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.junit.AfterClass; import org.junit.Test; import org.mockito.Mockito; @@ -285,4 +286,58 @@ public class TestDeletionService { del.stop(); } } + + @Test + public void testRecovery() throws Exception { + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("SEED: " + seed); + List baseDirs = buildDirs(r, base, 4); + createDirs(new Path("."), baseDirs); + List content = buildDirs(r, new Path("."), 10); + for (Path b : baseDirs) { + createDirs(b, content); + } + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 1); + NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + DeletionService del = + new DeletionService(new FakeDefaultContainerExecutor(), stateStore); + try { + del.init(conf); + del.start(); + for (Path p : content) { + assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p))); + del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", + p, baseDirs.toArray(new Path[4])); + } + + // restart the deletion service + del.stop(); + del = new DeletionService(new FakeDefaultContainerExecutor(), + stateStore); + del.init(conf); + del.start(); + + // verify paths are still eventually deleted + int msecToWait = 10 * 1000; + for (Path p : baseDirs) { + for (Path q : content) { + Path fp = new Path(p, q); + while (msecToWait > 0 && lfs.util().exists(fp)) { + Thread.sleep(100); + msecToWait -= 100; + } + assertFalse(lfs.util().exists(fp)); + } + } + } finally { + del.close(); + stateStore.close(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index a146e7b4c93..0c8a8439b47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery; +import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -25,10 +27,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; public class NMMemoryStateStoreService extends NMStateStoreService { private Map trackerStates; + private Map deleteTasks; public NMMemoryStateStoreService() { super(NMMemoryStateStoreService.class.getName()); @@ -110,6 +114,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override protected void initStorage(Configuration conf) { trackerStates = new HashMap(); + deleteTasks = new HashMap(); } @Override @@ -121,6 +126,28 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } + @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + RecoveredDeletionServiceState result = + new RecoveredDeletionServiceState(); + result.tasks = new ArrayList( + deleteTasks.values()); + return result; + } + + @Override + public synchronized void storeDeletionTask(int taskId, + DeletionServiceDeleteTaskProto taskProto) throws IOException { + deleteTasks.put(taskId, taskProto); + } + + @Override + public synchronized void removeDeletionTask(int taskId) throws IOException { + deleteTasks.remove(taskId); + } + + private static class TrackerState { Map inProgressMap = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index c970c1c3d1f..494b27fff73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -404,4 +406,58 @@ public class TestNMLeveldbStateStoreService { state.getUserResources(); assertTrue(userResources.isEmpty()); } + + @Test + public void testDeletionTaskStorage() throws IOException { + // test empty when no state + RecoveredDeletionServiceState state = + stateStore.loadDeletionServiceState(); + assertTrue(state.getTasks().isEmpty()); + + // store a deletion task and verify recovered + DeletionServiceDeleteTaskProto proto = + DeletionServiceDeleteTaskProto.newBuilder() + .setId(7) + .setUser("someuser") + .setSubdir("some/subdir") + .addBasedirs("some/dir/path") + .addBasedirs("some/other/dir/path") + .setDeletionTime(123456L) + .addSuccessorIds(8) + .addSuccessorIds(9) + .build(); + stateStore.storeDeletionTask(proto.getId(), proto); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertEquals(1, state.getTasks().size()); + assertEquals(proto, state.getTasks().get(0)); + + // store another deletion task + DeletionServiceDeleteTaskProto proto2 = + DeletionServiceDeleteTaskProto.newBuilder() + .setId(8) + .setUser("user2") + .setSubdir("subdir2") + .setDeletionTime(789L) + .build(); + stateStore.storeDeletionTask(proto2.getId(), proto2); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertEquals(2, state.getTasks().size()); + assertTrue(state.getTasks().contains(proto)); + assertTrue(state.getTasks().contains(proto2)); + + // delete a task and verify gone after recovery + stateStore.removeDeletionTask(proto2.getId()); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertEquals(1, state.getTasks().size()); + assertEquals(proto, state.getTasks().get(0)); + + // delete the last task and verify none left + stateStore.removeDeletionTask(proto.getId()); + restartStateStore(); + state = stateStore.loadDeletionServiceState(); + assertTrue(state.getTasks().isEmpty()); + } }