From 0ce55c699cca53ad1231737675eae63dd9bbf6f0 Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Thu, 9 Apr 2020 17:18:07 +0000 Subject: [PATCH] YARN-8242. YARN NM: OOM error while reading back the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet Sachdev YARN-10227. Pull YARN-8242 back to branch-2.10. Contributed by Jim Brennan --- .../server/nodemanager/DeletionService.java | 25 +- .../ContainerManagerImpl.java | 26 +- .../ResourceLocalizationService.java | 56 ++- .../recovery/NMLeveldbStateStoreService.java | 432 +++++++++++------- .../recovery/NMNullStateStoreService.java | 2 +- .../recovery/NMStateStoreService.java | 55 ++- .../recovery/RecoveryIterator.java | 41 ++ .../NMContainerTokenSecretManager.java | 27 +- .../security/NMTokenSecretManagerInNM.java | 15 +- .../recovery/NMMemoryStateStoreService.java | 82 +++- .../TestNMLeveldbStateStoreService.java | 208 ++++++--- 11 files changed, 652 insertions(+), 317 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index ae81dc11187..e665c5ad2b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,13 +19,14 @@ package org.apache.hadoop.yarn.server.nodemanager; import static java.util.concurrent.TimeUnit.SECONDS; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -96,16 +97,20 @@ public void delete(DeletionTask deletionTask) { private void recover(NMStateStoreService.RecoveredDeletionServiceState state) throws IOException { - List taskProtos = state.getTasks(); Map idToInfoMap = - new HashMap<>(taskProtos.size()); - Set successorTasks = new HashSet<>(); - for (DeletionServiceDeleteTaskProto proto : taskProtos) { - DeletionTaskRecoveryInfo info = - NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); - idToInfoMap.put(info.getTask().getTaskId(), info); - nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); - successorTasks.addAll(info.getSuccessorTaskIds()); + new HashMap(); + Set successorTasks = new HashSet(); + + try (RecoveryIterator it = + state.getIterator()) { + while (it.hasNext()) { + DeletionServiceDeleteTaskProto proto = it.next(); + DeletionTaskRecoveryInfo info = + NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); + idToInfoMap.put(info.getTask().getTaskId(), info); + nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); + successorTasks.addAll(info.getSuccessorTaskIds()); + } } // restore the task dependencies and schedule the deletion tasks that diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 6f6fa7f5237..58e2a1f5dd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -356,19 +357,26 @@ private void recover() throws IOException, URISyntaxException { stateStore.loadLocalizationState()); RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); - for (ContainerManagerApplicationProto proto : - appsState.getApplications()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering application with state: " + proto.toString()); + try (RecoveryIterator rasIterator = + appsState.getIterator()) { + while (rasIterator.hasNext()) { + ContainerManagerApplicationProto proto = rasIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering application with state: " + proto.toString()); + } + recoverApplication(proto); } - recoverApplication(proto); } - for (RecoveredContainerState rcs : stateStore.loadContainersState()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering container with state: " + rcs); + try (RecoveryIterator rcsIterator = + stateStore.getContainerStateIterator()) { + while (rcsIterator.hasNext()) { + RecoveredContainerState rcs = rcsIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering container with state: " + rcs); + } + recoverContainer(rcs); } - recoverContainer(rcs); } // Recovery AMRMProxy state after apps and containers are recovered diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index b751aa60e45..25df84350c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -296,42 +298,46 @@ public void onDirsChanged() { //Recover localized resources after an NM restart public void recoverLocalizedResources(RecoveredLocalizationState state) - throws URISyntaxException { + throws URISyntaxException, IOException { LocalResourceTrackerState trackerState = state.getPublicTrackerState(); recoverTrackerResources(publicRsrc, trackerState); - for (Map.Entry userEntry : - state.getUserResources().entrySet()) { - String user = userEntry.getKey(); - RecoveredUserResources userResources = userEntry.getValue(); - trackerState = userResources.getPrivateTrackerState(); - if (!trackerState.isEmpty()) { - LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, - tracker); - if (oldTracker != null) { - tracker = oldTracker; - } - recoverTrackerResources(tracker, trackerState); - } - - for (Map.Entry appEntry : - userResources.getAppTrackerStates().entrySet()) { - trackerState = appEntry.getValue(); + try (RecoveryIterator> it + = state.getIterator()) { + while (it.hasNext()) { + Map.Entry userEntry = it.next(); + String user = userEntry.getKey(); + RecoveredUserResources userResources = userEntry.getValue(); + trackerState = userResources.getPrivateTrackerState(); if (!trackerState.isEmpty()) { - ApplicationId appId = appEntry.getKey(); - String appIdStr = appId.toString(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - appId, dispatcher, false, super.getConfig(), stateStore, + null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, tracker); if (oldTracker != null) { tracker = oldTracker; } recoverTrackerResources(tracker, trackerState); } + + for (Map.Entry appEntry : + userResources.getAppTrackerStates().entrySet()) { + trackerState = appEntry.getValue(); + if (!trackerState.isEmpty()) { + ApplicationId appId = appEntry.getKey(); + String appIdStr = appId.toString(); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore, + dirsHandler); + LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + } } } } @@ -557,7 +563,7 @@ private void handleCleanupContainerResources( rsrcCleanup.getResources(); for (Map.Entry> e : rsrcs.entrySet()) { - LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), + LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), c.getContainerId().getApplicationAttemptId() .getApplicationId()); for (LocalResourceRequest req : e.getValue()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 49c27645627..ee6bf6be766 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -67,6 +67,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -74,6 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -225,68 +227,119 @@ boolean isHealthy() { return isHealthy; } - @Override - public List loadContainersState() + // LeveldbIterator starting at startkey + private LeveldbIterator getLevelDBIterator(String startKey) throws IOException { - ArrayList containers = - new ArrayList(); - ArrayList containersToRemove = - new ArrayList(); - LeveldbIterator iter = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + LeveldbIterator it = new LeveldbIterator(db); + it.seek(bytes(startKey)); + return it; + } catch (DBException e) { + throw new IOException(e); + } + } - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + // Base Recovery Iterator + private abstract class BaseRecoveryIterator implements + RecoveryIterator { + LeveldbIterator it; + T nextItem; + + BaseRecoveryIterator(String dbKey) throws IOException { + this.it = getLevelDBIterator(dbKey); + this.nextItem = null; + } + + protected abstract T getNextItem(LeveldbIterator it) throws IOException; + + @Override + public boolean hasNext() throws IOException { + if (nextItem == null) { + nextItem = getNextItem(it); + } + return (nextItem != null); + } + + @Override + public T next() throws IOException, NoSuchElementException { + T tmp = nextItem; + if (tmp != null) { + nextItem = null; + return tmp; + } else { + tmp = getNextItem(it); + if (tmp == null) { + throw new NoSuchElementException(); + } + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null) { + it.close(); + } + } + } + + // Container Recovery Iterator + private class ContainerStateIterator extends + BaseRecoveryIterator { + ContainerStateIterator() throws IOException { + super(CONTAINERS_KEY_PREFIX); + } + + @Override + protected RecoveredContainerState getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredContainer(it); + } + } + + private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it) + throws IOException { + RecoveredContainerState rcs = null; + try { + while (it.hasNext()) { + Entry entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { - break; + return null; } int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); if (idEndPos < 0) { throw new IOException("Unable to determine container in key: " + key); } - ContainerId containerId = ContainerId.fromString( - key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); - String keyPrefix = key.substring(0, idEndPos+1); - RecoveredContainerState rcs = loadContainerState(containerId, - iter, keyPrefix); - // Don't load container without StartContainerRequest + String keyPrefix = key.substring(0, idEndPos + 1); + rcs = loadContainerState(it, keyPrefix); if (rcs.startRequest != null) { - containers.add(rcs); + break; } else { - containersToRemove.add(containerId); + removeContainer(rcs.getContainerId()); + rcs = null; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } - - // remove container without StartContainerRequest - for (ContainerId containerId : containersToRemove) { - LOG.warn("Remove container " + containerId + - " with incomplete records"); - try { - removeContainer(containerId); - // TODO: kill and cleanup the leaked container - } catch (IOException e) { - LOG.error("Unable to remove container " + containerId + - " in store", e); - } - } - - return containers; + return rcs; } - private RecoveredContainerState loadContainerState(ContainerId containerId, - LeveldbIterator iter, String keyPrefix) throws IOException { - RecoveredContainerState rcs = new RecoveredContainerState(); + + @Override + public RecoveryIterator getContainerStateIterator() + throws IOException { + return new ContainerStateIterator(); + } + + private RecoveredContainerState loadContainerState(LeveldbIterator iter, + String keyPrefix) throws IOException { + ContainerId containerId = ContainerId.fromString( + keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(), + keyPrefix.length()-1)); + RecoveredContainerState rcs = new RecoveredContainerState(containerId); rcs.status = RecoveredContainerStatus.REQUESTED; while (iter.hasNext()) { Entry entry = iter.peekNext(); @@ -650,35 +703,45 @@ public void removeContainer(ContainerId containerId) } + // Application Recovery Iterator + private class ApplicationStateIterator extends + BaseRecoveryIterator { + ApplicationStateIterator() throws IOException { + super(APPLICATIONS_KEY_PREFIX); + } + + @Override + protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredApplication(it); + } + } + + private ContainerManagerApplicationProto getNextRecoveredApplication( + LeveldbIterator it) throws IOException { + ContainerManagerApplicationProto applicationProto = null; + try { + if (it.hasNext()) { + Entry entry = it.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) { + return null; + } + applicationProto = ContainerManagerApplicationProto.parseFrom( + entry.getValue()); + } + } catch (DBException e) { + throw new IOException(e); + } + return applicationProto; + } + @Override public RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList(); - String keyPrefix = APPLICATIONS_KEY_PREFIX; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(keyPrefix)); - while (iter.hasNext()) { - Entry entry = iter.next(); - String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { - break; - } - state.applications.add( - ContainerManagerApplicationProto.parseFrom(entry.getValue())); - } - } catch (DBException e) { - throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } - } - + state.it = new ApplicationStateIterator(); cleanupDeprecatedFinishedApps(); - return state; } @@ -722,24 +785,29 @@ public void removeApplication(ApplicationId appId) } - @Override - public RecoveredLocalizationState loadLocalizationState() - throws IOException { - RecoveredLocalizationState state = new RecoveredLocalizationState(); + // User Resource Recovery Iterator. + private class UserResourcesIterator extends + BaseRecoveryIterator> { + UserResourcesIterator() throws IOException { + super(LOCALIZATION_PRIVATE_KEY_PREFIX); + } - LeveldbIterator iter = null; + @Override + protected Entry getNextItem( + LeveldbIterator it) throws IOException { + return getNextRecoveredPrivateLocalizationEntry(it); + } + } + + private Entry getNextRecoveredPrivateLocalizationEntry( + LeveldbIterator it) throws IOException { + Entry localEntry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX)); - state.publicTrackerState = loadResourceTrackerState(iter, - LOCALIZATION_PUBLIC_KEY_PREFIX); - - iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + if (it.hasNext()) { + Entry entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) { - break; + return null; } int userEndPos = key.indexOf('/', @@ -750,17 +818,24 @@ public RecoveredLocalizationState loadLocalizationState() } String user = key.substring( LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos); - state.userResources.put(user, loadUserLocalizedResources(iter, - key.substring(0, userEndPos+1))); + RecoveredUserResources val = loadUserLocalizedResources(it, + key.substring(0, userEndPos+1)); + localEntry = new AbstractMap.SimpleEntry<>(user, val); } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return localEntry; + } + @Override + public RecoveredLocalizationState loadLocalizationState() + throws IOException { + RecoveredLocalizationState state = new RecoveredLocalizationState(); + LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); + state.publicTrackerState = loadResourceTrackerState(it, + LOCALIZATION_PUBLIC_KEY_PREFIX); + state.it = new UserResourcesIterator(); return state; } @@ -770,7 +845,7 @@ private LocalResourceTrackerState loadResourceTrackerState( final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX; LocalResourceTrackerState state = new LocalResourceTrackerState(); while (iter.hasNext()) { - Entry entry = iter.peekNext(); + Entry entry = iter.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; @@ -951,32 +1026,44 @@ private String getResourceTrackerKeyPrefix(String user, + LOCALIZATION_APPCACHE_SUFFIX + appId + "/"; } + // Deletion State Recovery Iterator. + private class DeletionStateIterator extends + BaseRecoveryIterator { + DeletionStateIterator() throws IOException { + super(DELETION_TASK_KEY_PREFIX); + } + + @Override + protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredDeletionService(it); + } + } + + private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService( + LeveldbIterator it) throws IOException { + DeletionServiceDeleteTaskProto deleteProto = null; + try { + if (it.hasNext()) { + Entry entry = it.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { + return null; + } + deleteProto = DeletionServiceDeleteTaskProto.parseFrom( + entry.getValue()); + } + } catch (DBException e) { + throw new IOException(e); + } + return deleteProto; + } @Override public RecoveredDeletionServiceState loadDeletionServiceState() throws IOException { RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); - state.tasks = new ArrayList(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(DELETION_TASK_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.next(); - String key = asString(entry.getKey()); - if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { - break; - } - state.tasks.add( - DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); - } - } catch (DBException e) { - throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } - } + state.it = new DeletionStateIterator(); return state; } @@ -1003,29 +1090,44 @@ public void removeDeletionTask(int taskId) throws IOException { } } + private MasterKey getMasterKey(String dbKey) throws IOException { + try{ + byte[] data = db.get(bytes(dbKey)); + if (data == null || data.length == 0) { + return null; + } + return parseMasterKey(data); + } catch (DBException e) { + throw new IOException(e); + } + } - @Override - public RecoveredNMTokensState loadNMTokensState() throws IOException { - RecoveredNMTokensState state = new RecoveredNMTokensState(); - state.applicationMasterKeys = - new HashMap(); - LeveldbIterator iter = null; + // Recover NMTokens Iterator + private class NMTokensStateIterator extends + BaseRecoveryIterator> { + NMTokensStateIterator() throws IOException { + super(NM_TOKENS_KEY_PREFIX); + } + + @Override + protected Entry getNextItem( + LeveldbIterator it) throws IOException { + return getNextMasterKeyEntry(it); + } + } + + private Entry getNextMasterKeyEntry( + LeveldbIterator it) throws IOException { + Entry masterKeyentry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(NM_TOKENS_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.next(); + while (it.hasNext()) { + Entry entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) { break; } String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length()); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith( - ApplicationAttemptId.appAttemptIdStrPrefix)) { + if (key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { ApplicationAttemptId attempt; try { attempt = ApplicationAttemptId.fromString(key); @@ -1033,17 +1135,25 @@ public RecoveredNMTokensState loadNMTokensState() throws IOException { throw new IOException("Bad application master key state for " + fullKey, e); } - state.applicationMasterKeys.put(attempt, + masterKeyentry = new AbstractMap.SimpleEntry<>(attempt, parseMasterKey(entry.getValue())); + break; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return masterKeyentry; + } + + @Override + public RecoveredNMTokensState loadNMTokensState() throws IOException { + RecoveredNMTokensState state = new RecoveredNMTokensState(); + state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new NMTokensStateIterator(); return state; } @@ -1092,45 +1202,45 @@ private void storeMasterKey(String dbKey, MasterKey key) } } + // Recover ContainersToken Iterator. + private class ContainerTokensStateIterator extends + BaseRecoveryIterator> { + ContainerTokensStateIterator() throws IOException { + super(CONTAINER_TOKENS_KEY_PREFIX); + } - @Override - public RecoveredContainerTokensState loadContainerTokensState() + @Override + protected Entry getNextItem(LeveldbIterator it) + throws IOException { + return getNextContainerToken(it); + } + } + + private Entry getNextContainerToken(LeveldbIterator it) throws IOException { - RecoveredContainerTokensState state = new RecoveredContainerTokensState(); - state.activeTokens = new HashMap(); - LeveldbIterator iter = null; + Entry containerTokenEntry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX)); - final int containerTokensKeyPrefixLength = - CONTAINER_TOKENS_KEY_PREFIX.length(); - while (iter.hasNext()) { - Entry entry = iter.next(); + while (it.hasNext()) { + Entry entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) { break; } - String key = fullKey.substring(containerTokensKeyPrefixLength); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { - loadContainerToken(state, fullKey, key, entry.getValue()); + String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length()); + if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + containerTokenEntry = loadContainerToken(fullKey, key, + entry.getValue()); + break; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } - return state; + return containerTokenEntry; } - private static void loadContainerToken(RecoveredContainerTokensState state, - String key, String containerIdStr, byte[] value) throws IOException { + private static Entry loadContainerToken(String key, + String containerIdStr, byte[] value) throws IOException { ContainerId containerId; Long expTime; try { @@ -1139,7 +1249,19 @@ private static void loadContainerToken(RecoveredContainerTokensState state, } catch (IllegalArgumentException e) { throw new IOException("Bad container token state for " + key, e); } - state.activeTokens.put(containerId, expTime); + return new AbstractMap.SimpleEntry<>(containerId, expTime); + } + + @Override + public RecoveredContainerTokensState loadContainerTokensState() + throws IOException { + RecoveredContainerTokensState state = new RecoveredContainerTokensState(); + state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new ContainerTokensStateIterator(); + return state; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 95ec61ae1a3..3bfc25d5e8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -65,7 +65,7 @@ public void removeApplication(ApplicationId appId) throws IOException { } @Override - public List loadContainersState() + public RecoveryIterator getContainerStateIterator() throws IOException { throw new UnsupportedOperationException( "Recovery not supported by this state store"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index bedf2a517cd..50eeaeff280 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -67,12 +68,11 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { } public static class RecoveredApplicationsState { - List applications; + RecoveryIterator it = null; - public List getApplications() { - return applications; + public RecoveryIterator getIterator() { + return it; } - } /** @@ -105,6 +105,15 @@ public static class RecoveredContainerState { RecoveredContainerType.RECOVER; private long startTime; private ResourceMappings resMappings = new ResourceMappings(); + private final ContainerId containerId; + + RecoveredContainerState(ContainerId containerId){ + this.containerId = containerId; + } + + public ContainerId getContainerId() { + return containerId; + } public RecoveredContainerStatus getStatus() { return status; @@ -237,30 +246,33 @@ public LocalResourceTrackerState getPrivateTrackerState() { public static class RecoveredLocalizationState { LocalResourceTrackerState publicTrackerState = new LocalResourceTrackerState(); - Map userResources = - new HashMap(); + RecoveryIterator> it = null; public LocalResourceTrackerState getPublicTrackerState() { return publicTrackerState; } - public Map getUserResources() { - return userResources; + public RecoveryIterator> getIterator() { + return it; } } public static class RecoveredDeletionServiceState { - List tasks; + RecoveryIterator it = null; - public List getTasks() { - return tasks; + public RecoveryIterator getIterator(){ + return it; } } public static class RecoveredNMTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map applicationMasterKeys; + RecoveryIterator> it = null; + + public RecoveryIterator> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -270,15 +282,16 @@ public MasterKey getPreviousMasterKey() { return previousMasterKey; } - public Map getApplicationMasterKeys() { - return applicationMasterKeys; - } } public static class RecoveredContainerTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map activeTokens; + RecoveryIterator> it = null; + + public RecoveryIterator> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -288,9 +301,6 @@ public MasterKey getPreviousMasterKey() { return previousMasterKey; } - public Map getActiveTokens() { - return activeTokens; - } } public static class RecoveredLogDeleterState { @@ -389,11 +399,10 @@ public abstract void removeApplication(ApplicationId appId) /** - * Load the state of containers - * @return recovered state for containers - * @throws IOException + * get the Recovered Container State Iterator + * @return recovery iterator */ - public abstract List loadContainersState() + public abstract RecoveryIterator getContainerStateIterator() throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java new file mode 100644 index 00000000000..0bb262a6b53 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.io.Closeable; +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * A wrapper for a Iterator to translate the raw RuntimeExceptions that + * can be thrown into IOException. + */ +public interface RecoveryIterator extends Closeable { + + /** + * Returns true if the iteration has more elements. + */ + boolean hasNext() throws IOException; + + /** + * Returns the next element in the iteration. + */ + T next() throws IOException, NoSuchElementException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java index 256f649cabf..b3df69b5554 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,17 +92,20 @@ public synchronized void recover() super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Entry entry : state.getActiveTokens().entrySet()) { - ContainerId containerId = entry.getKey(); - Long expTime = entry.getValue(); - List containerList = - recentlyStartedContainerTracker.get(expTime); - if (containerList == null) { - containerList = new ArrayList(); - recentlyStartedContainerTracker.put(expTime, containerList); - } - if (!containerList.contains(containerId)) { - containerList.add(containerId); + try (RecoveryIterator> it = state.getIterator()) { + while (it.hasNext()) { + Entry entry = it.next(); + ContainerId containerId = entry.getKey(); + Long expTime = entry.getValue(); + List containerList = + recentlyStartedContainerTracker.get(expTime); + if (containerList == null) { + containerList = new ArrayList(); + recentlyStartedContainerTracker.put(expTime, containerList); + } + if (!containerList.contains(containerId)) { + containerList.add(containerId); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java index 0956e77f7fa..f8957915677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,11 +89,14 @@ public synchronized void recover() super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Map.Entry entry : - state.getApplicationMasterKeys().entrySet()) { - key = entry.getValue(); - oldMasterKeys.put(entry.getKey(), - new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + try (RecoveryIterator> it = + state.getIterator()) { + while (it.hasNext()) { + Map.Entry entry = it.next(); + key = entry.getValue(); + oldMasterKeys.put(entry.getKey(), + new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + } } // reconstruct app to app attempts map diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 4364709b56f..25d96da7a67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,6 +57,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { private Map deleteTasks; private RecoveredNMTokensState nmTokenState; private RecoveredContainerTokensState containerTokenState; + private Map applicationMasterKeys; + private Map activeTokens; private Map logDeleterState; private RecoveredAMRMProxyState amrmProxyState; @@ -68,10 +71,9 @@ protected void initStorage(Configuration conf) { apps = new HashMap(); containerStates = new HashMap(); nmTokenState = new RecoveredNMTokensState(); - nmTokenState.applicationMasterKeys = - new HashMap(); + applicationMasterKeys = new HashMap(); containerTokenState = new RecoveredContainerTokensState(); - containerTokenState.activeTokens = new HashMap(); + activeTokens = new HashMap(); trackerStates = new HashMap(); deleteTasks = new HashMap(); logDeleterState = new HashMap(); @@ -86,13 +88,39 @@ protected void startStorage() { protected void closeStorage() { } + // Recovery Iterator Implementation. + private class NMMemoryRecoveryIterator implements RecoveryIterator { + + private Iterator it; + + NMMemoryRecoveryIterator(Iterator it){ + this.it = it; + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() throws IOException { + return it.next(); + } + + @Override + public void close() throws IOException { + + } + } @Override public synchronized RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList( - apps.values()); + List containerList = + new ArrayList(apps.values()); + state.it = new NMMemoryRecoveryIterator( + containerList.iterator()); return state; } @@ -111,13 +139,13 @@ public synchronized void removeApplication(ApplicationId appId) } @Override - public synchronized List loadContainersState() + public RecoveryIterator getContainerStateIterator() throws IOException { // return a copy so caller can't modify our state List result = new ArrayList(containerStates.size()); for (RecoveredContainerState rcs : containerStates.values()) { - RecoveredContainerState rcsCopy = new RecoveredContainerState(); + RecoveredContainerState rcsCopy = new RecoveredContainerState(rcs.getContainerId()); rcsCopy.status = rcs.status; rcsCopy.exitCode = rcs.exitCode; rcsCopy.killed = rcs.killed; @@ -130,13 +158,14 @@ public synchronized List loadContainersState() rcsCopy.setResourceMappings(rcs.getResourceMappings()); result.add(rcsCopy); } - return result; + return new NMMemoryRecoveryIterator( + result.iterator()); } @Override public synchronized void storeContainer(ContainerId containerId, int version, long startTime, StartContainerRequest startRequest) { - RecoveredContainerState rcs = new RecoveredContainerState(); + RecoveredContainerState rcs = new RecoveredContainerState(containerId); rcs.startRequest = startRequest; rcs.version = version; try { @@ -275,6 +304,8 @@ private TrackerState getTrackerState(TrackerKey key) { @Override public synchronized RecoveredLocalizationState loadLocalizationState() { RecoveredLocalizationState result = new RecoveredLocalizationState(); + Map userResources = + new HashMap(); for (Map.Entry e : trackerStates.entrySet()) { TrackerKey tk = e.getKey(); TrackerState ts = e.getValue(); @@ -285,10 +316,10 @@ public synchronized RecoveredLocalizationState loadLocalizationState() { if (tk.user == null) { result.publicTrackerState = loadTrackerState(ts); } else { - RecoveredUserResources rur = result.userResources.get(tk.user); + RecoveredUserResources rur = userResources.get(tk.user); if (rur == null) { rur = new RecoveredUserResources(); - result.userResources.put(tk.user, rur); + userResources.put(tk.user, rur); } if (tk.appId == null) { rur.privateTrackerState = loadTrackerState(ts); @@ -297,6 +328,8 @@ public synchronized RecoveredLocalizationState loadLocalizationState() { } } } + result.it = new NMMemoryRecoveryIterator>( + userResources.entrySet().iterator()); return result; } @@ -332,8 +365,10 @@ public synchronized RecoveredDeletionServiceState loadDeletionServiceState() throws IOException { RecoveredDeletionServiceState result = new RecoveredDeletionServiceState(); - result.tasks = new ArrayList( - deleteTasks.values()); + List deleteTaskProtos = + new ArrayList(deleteTasks.values()); + result.it = new NMMemoryRecoveryIterator( + deleteTaskProtos.iterator()); return result; } @@ -356,9 +391,10 @@ public synchronized RecoveredNMTokensState loadNMTokensState() RecoveredNMTokensState result = new RecoveredNMTokensState(); result.currentMasterKey = nmTokenState.currentMasterKey; result.previousMasterKey = nmTokenState.previousMasterKey; - result.applicationMasterKeys = - new HashMap( - nmTokenState.applicationMasterKeys); + Map masterKeysMap = + new HashMap(applicationMasterKeys); + result.it = new NMMemoryRecoveryIterator>( + masterKeysMap.entrySet().iterator()); return result; } @@ -380,14 +416,14 @@ public synchronized void storeNMTokenPreviousMasterKey(MasterKey key) public synchronized void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; - nmTokenState.applicationMasterKeys.put(attempt, + applicationMasterKeys.put(attempt, new MasterKeyPBImpl(keypb.getProto())); } @Override public synchronized void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException { - nmTokenState.applicationMasterKeys.remove(attempt); + applicationMasterKeys.remove(attempt); } @@ -399,8 +435,10 @@ public synchronized RecoveredContainerTokensState loadContainerTokensState() new RecoveredContainerTokensState(); result.currentMasterKey = containerTokenState.currentMasterKey; result.previousMasterKey = containerTokenState.previousMasterKey; - result.activeTokens = - new HashMap(containerTokenState.activeTokens); + Map containersTokenMap = + new HashMap(activeTokens); + result.it = new NMMemoryRecoveryIterator>( + containersTokenMap.entrySet().iterator()); return result; } @@ -423,13 +461,13 @@ public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key) @Override public synchronized void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException { - containerTokenState.activeTokens.put(containerId, expirationTime); + activeTokens.put(containerId, expirationTime); } @Override public synchronized void removeContainerToken(ContainerId containerId) throws IOException { - containerTokenState.activeTokens.remove(containerId); + activeTokens.remove(containerId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index ea2cb2eed70..3f335845d91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -123,6 +123,73 @@ public void cleanup() throws IOException { FileUtil.fullyDelete(TMP_DIR); } + private List loadContainersState( + RecoveryIterator it) throws IOException { + List containers = + new ArrayList(); + while (it.hasNext()) { + RecoveredContainerState rcs = it.next(); + containers.add(rcs); + } + return containers; + } + + private List loadApplicationProtos( + RecoveryIterator it) + throws IOException { + List applicationProtos = + new ArrayList(); + while (it.hasNext()) { + applicationProtos.add(it.next()); + } + return applicationProtos; + } + + private List loadDeletionTaskProtos( + RecoveryIterator it) throws IOException { + List deleteTaskProtos = + new ArrayList(); + while (it.hasNext()) { + deleteTaskProtos.add(it.next()); + } + return deleteTaskProtos; + } + + private Map loadUserResources( + RecoveryIterator> it) + throws IOException { + Map userResources = + new HashMap(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + userResources.put(entry.getKey(), entry.getValue()); + } + return userResources; + } + + private Map loadNMTokens( + RecoveryIterator> it) + throws IOException { + Map nmTokens = + new HashMap(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + nmTokens.put(entry.getKey(), entry.getValue()); + } + return nmTokens; + } + + private Map loadContainerTokens( + RecoveryIterator> it) throws IOException { + Map containerTokens = + new HashMap(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + containerTokens.put(entry.getKey(), entry.getValue()); + } + return containerTokens; + } + private void restartStateStore() throws IOException { // need to close so leveldb releases database lock if (stateStore != null) { @@ -140,7 +207,7 @@ private void verifyEmptyState() throws IOException { assertNotNull(pubts); assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); - assertTrue(state.getUserResources().isEmpty()); + assertTrue(loadUserResources(state.getIterator()).isEmpty()); } @Test @@ -181,7 +248,7 @@ public void testCheckVersion() throws IOException { restartStateStore(); Assert.fail("Incompatible version, should expect fail here."); } catch (ServiceStateException e) { - Assert.assertTrue("Exception message mismatch", + Assert.assertTrue("Exception message mismatch", e.getMessage().contains("Incompatible version for NM state:")); } } @@ -190,7 +257,9 @@ public void testCheckVersion() throws IOException { public void testApplicationStorage() throws IOException { // test empty when no state RecoveredApplicationsState state = stateStore.loadApplicationsState(); - assertTrue(state.getApplications().isEmpty()); + List apps = + loadApplicationProtos(state.getIterator()); + assertTrue(apps.isEmpty()); // store an application and verify recovered final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); @@ -202,8 +271,9 @@ public void testApplicationStorage() throws IOException { stateStore.storeApplication(appId1, appProto1); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); // add a new app final ApplicationId appId2 = ApplicationId.newInstance(1234, 2); @@ -214,23 +284,25 @@ public void testApplicationStorage() throws IOException { stateStore.storeApplication(appId2, appProto2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(2, state.getApplications().size()); - assertTrue(state.getApplications().contains(appProto1)); - assertTrue(state.getApplications().contains(appProto2)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(2, apps.size()); + assertTrue(apps.contains(appProto1)); + assertTrue(apps.contains(appProto2)); // test removing an application stateStore.removeApplication(appId2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); } @Test public void testContainerStorage() throws IOException { // test empty when no state List recoveredContainers = - stateStore.loadContainersState(); + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // create a container request @@ -252,7 +324,8 @@ public void testContainerStorage() throws IOException { stateStore.getContainerVersionKey(containerId.toString())))); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -267,14 +340,16 @@ public void testContainerStorage() throws IOException { // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); stateStore.storeContainerLaunched(containerId1); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); // queue the container, and verify recovered stateStore.storeContainerQueued(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); @@ -290,7 +365,8 @@ public void testContainerStorage() throws IOException { diags.append("some diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -303,7 +379,8 @@ public void testContainerStorage() throws IOException { // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); @@ -314,7 +391,8 @@ public void testContainerStorage() throws IOException { // Resume the container stateStore.removeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); // increase the container size, and verify recovered @@ -326,7 +404,8 @@ public void testContainerStorage() throws IOException { stateStore .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -340,7 +419,8 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerKilled(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -356,7 +436,8 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerCompleted(containerId, 21); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); @@ -369,7 +450,8 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(6, rcs.getRemainingRetryAttempts()); @@ -379,7 +461,8 @@ public void testContainerStorage() throws IOException { // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); } @@ -458,7 +541,7 @@ public void testStartResourceLocalization() throws IOException { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -512,7 +595,7 @@ public void testStartResourceLocalization() throws IOException { pubts.getInProgressResources().get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -561,7 +644,7 @@ public void testFinishResourceLocalization() throws IOException { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -631,7 +714,7 @@ public void testFinishResourceLocalization() throws IOException { assertEquals(1, pubts.getInProgressResources().size()); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -739,7 +822,7 @@ public void testRemoveLocalizedResource() throws IOException { assertEquals(pubLocalizedProto1, pubts.getLocalizedResources().iterator().next()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty()); } @@ -748,7 +831,9 @@ public void testDeletionTaskStorage() throws IOException { // test empty when no state RecoveredDeletionServiceState state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); + List deleteTaskProtos = + loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); // store a deletion task and verify recovered DeletionServiceDeleteTaskProto proto = @@ -765,8 +850,9 @@ public void testDeletionTaskStorage() throws IOException { stateStore.storeDeletionTask(proto.getId(), proto); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // store another deletion task DeletionServiceDeleteTaskProto proto2 = @@ -779,31 +865,36 @@ public void testDeletionTaskStorage() throws IOException { stateStore.storeDeletionTask(proto2.getId(), proto2); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(2, state.getTasks().size()); - assertTrue(state.getTasks().contains(proto)); - assertTrue(state.getTasks().contains(proto2)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(2, deleteTaskProtos.size()); + assertTrue(deleteTaskProtos.contains(proto)); + assertTrue(deleteTaskProtos.contains(proto2)); + // delete a task and verify gone after recovery stateStore.removeDeletionTask(proto2.getId()); restartStateStore(); - state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + state = stateStore.loadDeletionServiceState(); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // delete the last task and verify none left stateStore.removeDeletionTask(proto.getId()); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); - } + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); } @Test public void testNMTokenStorage() throws IOException { // test empty when no state RecoveredNMTokensState state = stateStore.loadNMTokensState(); + Map loadedAppKeys = + loadNMTokens(state.getIterator()); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a master key and verify recovered NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest(); @@ -811,18 +902,20 @@ public void testNMTokenStorage() throws IOException { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = secretMgr.generateKey(); stateStore.storeNMTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a few application keys and verify recovered ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance( @@ -835,10 +928,9 @@ public void testNMTokenStorage() throws IOException { stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map loadedAppKeys = - state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertEquals(attemptKey1, loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -857,9 +949,9 @@ public void testNMTokenStorage() throws IOException { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedAppKeys = state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertNull(loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -871,9 +963,10 @@ public void testContainerTokenStorage() throws IOException { // test empty when no state RecoveredContainerTokensState state = stateStore.loadContainerTokensState(); + Map loadedActiveTokens = loadContainerTokens(state.it); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a master key and verify recovered ContainerTokenKeyGeneratorForTest keygen = @@ -882,18 +975,20 @@ public void testContainerTokenStorage() throws IOException { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = keygen.generateKey(); stateStore.storeContainerTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a few container tokens and verify recovered ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1); @@ -904,10 +999,9 @@ public void testContainerTokenStorage() throws IOException { stateStore.storeContainerToken(cid2, expTime2); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map loadedActiveTokens = - state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertEquals(expTime1, loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -925,9 +1019,9 @@ public void testContainerTokenStorage() throws IOException { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedActiveTokens = state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertNull(loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -1006,8 +1100,8 @@ protected DB openDatabase(Configuration conf) { @Test public void testUnexpectedKeyDoesntThrowException() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1022,7 +1116,8 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { + containerId.toString() + "/invalidKey1234").getBytes(); stateStore.getDB().put(invalidKey, new byte[1]); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); @@ -1139,8 +1234,8 @@ public void testAMRMProxyStorage() throws IOException { @Test public void testStateStoreForResourceMapping() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1168,7 +1263,8 @@ public void testStateStoreForResourceMapping() throws IOException { // add a invalid key restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); List res = rcs.getResourceMappings()