diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index ae81dc11187..e665c5ad2b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,13 +19,14 @@ package org.apache.hadoop.yarn.server.nodemanager; import static java.util.concurrent.TimeUnit.SECONDS; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -96,16 +97,20 @@ public class DeletionService extends AbstractService { private void recover(NMStateStoreService.RecoveredDeletionServiceState state) throws IOException { - List taskProtos = state.getTasks(); Map idToInfoMap = - new HashMap<>(taskProtos.size()); - Set successorTasks = new HashSet<>(); - for (DeletionServiceDeleteTaskProto proto : taskProtos) { - DeletionTaskRecoveryInfo info = - NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); - idToInfoMap.put(info.getTask().getTaskId(), info); - nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); - successorTasks.addAll(info.getSuccessorTaskIds()); + new HashMap(); + Set successorTasks = new HashSet(); + + try (RecoveryIterator it = + state.getIterator()) { + while (it.hasNext()) { + DeletionServiceDeleteTaskProto proto = it.next(); + DeletionTaskRecoveryInfo info = + NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); + idToInfoMap.put(info.getTask().getTaskId(), info); + nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); + successorTasks.addAll(info.getSuccessorTaskIds()); + } } // restore the task dependencies and schedule the deletion tasks that diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8b3525820cf..b89e2ddd06b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -23,6 +23,7 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -356,19 +357,26 @@ public class ContainerManagerImpl extends CompositeService implements stateStore.loadLocalizationState()); RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); - for (ContainerManagerApplicationProto proto : - appsState.getApplications()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering application with state: " + proto.toString()); + try (RecoveryIterator rasIterator = + appsState.getIterator()) { + while (rasIterator.hasNext()) { + ContainerManagerApplicationProto proto = rasIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering application with state: " + proto.toString()); + } + recoverApplication(proto); } - recoverApplication(proto); } - for (RecoveredContainerState rcs : stateStore.loadContainersState()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering container with state: " + rcs); + try (RecoveryIterator rcsIterator = + stateStore.getContainerStateIterator()) { + while (rcsIterator.hasNext()) { + RecoveredContainerState rcs = rcsIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering container with state: " + rcs); + } + recoverContainer(rcs); } - recoverContainer(rcs); } // Recovery AMRMProxy state after apps and containers are recovered diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 3834ecec6f9..2892d1fab47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,42 +297,46 @@ public class ResourceLocalizationService extends CompositeService //Recover localized resources after an NM restart public void recoverLocalizedResources(RecoveredLocalizationState state) - throws URISyntaxException { + throws URISyntaxException, IOException { LocalResourceTrackerState trackerState = state.getPublicTrackerState(); recoverTrackerResources(publicRsrc, trackerState); - for (Map.Entry userEntry : - state.getUserResources().entrySet()) { - String user = userEntry.getKey(); - RecoveredUserResources userResources = userEntry.getValue(); - trackerState = userResources.getPrivateTrackerState(); - if (!trackerState.isEmpty()) { - LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, - tracker); - if (oldTracker != null) { - tracker = oldTracker; - } - recoverTrackerResources(tracker, trackerState); - } - - for (Map.Entry appEntry : - userResources.getAppTrackerStates().entrySet()) { - trackerState = appEntry.getValue(); + try (RecoveryIterator> it + = state.getIterator()) { + while (it.hasNext()) { + Map.Entry userEntry = it.next(); + String user = userEntry.getKey(); + RecoveredUserResources userResources = userEntry.getValue(); + trackerState = userResources.getPrivateTrackerState(); if (!trackerState.isEmpty()) { - ApplicationId appId = appEntry.getKey(); - String appIdStr = appId.toString(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - appId, dispatcher, false, super.getConfig(), stateStore, + null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, tracker); if (oldTracker != null) { tracker = oldTracker; } recoverTrackerResources(tracker, trackerState); } + + for (Map.Entry appEntry : + userResources.getAppTrackerStates().entrySet()) { + trackerState = appEntry.getValue(); + if (!trackerState.isEmpty()) { + ApplicationId appId = appEntry.getKey(); + String appIdStr = appId.toString(); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore, + dirsHandler); + LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + } } } } @@ -556,7 +562,7 @@ public class ResourceLocalizationService extends CompositeService rsrcCleanup.getResources(); for (Map.Entry> e : rsrcs.entrySet()) { - LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), + LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), c.getContainerId().getApplicationAttemptId() .getApplicationId()); for (LocalResourceRequest req : e.getValue()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 67f642df3e8..5d4253db9df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -73,6 +74,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -225,68 +227,119 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { return isHealthy; } - @Override - public List loadContainersState() + // LeveldbIterator starting at startkey + private LeveldbIterator getLevelDBIterator(String startKey) throws IOException { - ArrayList containers = - new ArrayList(); - ArrayList containersToRemove = - new ArrayList(); - LeveldbIterator iter = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + LeveldbIterator it = new LeveldbIterator(db); + it.seek(bytes(startKey)); + return it; + } catch (DBException e) { + throw new IOException(e); + } + } - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + // Base Recovery Iterator + private abstract class BaseRecoveryIterator implements + RecoveryIterator { + LeveldbIterator it; + T nextItem; + + BaseRecoveryIterator(String dbKey) throws IOException { + this.it = getLevelDBIterator(dbKey); + this.nextItem = null; + } + + protected abstract T getNextItem(LeveldbIterator it) throws IOException; + + @Override + public boolean hasNext() throws IOException { + if (nextItem == null) { + nextItem = getNextItem(it); + } + return (nextItem != null); + } + + @Override + public T next() throws IOException, NoSuchElementException { + T tmp = nextItem; + if (tmp != null) { + nextItem = null; + return tmp; + } else { + tmp = getNextItem(it); + if (tmp == null) { + throw new NoSuchElementException(); + } + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null) { + it.close(); + } + } + } + + // Container Recovery Iterator + private class ContainerStateIterator extends + BaseRecoveryIterator { + ContainerStateIterator() throws IOException { + super(CONTAINERS_KEY_PREFIX); + } + + @Override + protected RecoveredContainerState getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredContainer(it); + } + } + + private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it) + throws IOException { + RecoveredContainerState rcs = null; + try { + while (it.hasNext()) { + Entry entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { - break; + return null; } int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); if (idEndPos < 0) { throw new IOException("Unable to determine container in key: " + key); } - ContainerId containerId = ContainerId.fromString( - key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); - String keyPrefix = key.substring(0, idEndPos+1); - RecoveredContainerState rcs = loadContainerState(containerId, - iter, keyPrefix); - // Don't load container without StartContainerRequest + String keyPrefix = key.substring(0, idEndPos + 1); + rcs = loadContainerState(it, keyPrefix); if (rcs.startRequest != null) { - containers.add(rcs); + break; } else { - containersToRemove.add(containerId); + removeContainer(rcs.getContainerId()); + rcs = null; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } - - // remove container without StartContainerRequest - for (ContainerId containerId : containersToRemove) { - LOG.warn("Remove container " + containerId + - " with incomplete records"); - try { - removeContainer(containerId); - // TODO: kill and cleanup the leaked container - } catch (IOException e) { - LOG.error("Unable to remove container " + containerId + - " in store", e); - } - } - - return containers; + return rcs; } - private RecoveredContainerState loadContainerState(ContainerId containerId, - LeveldbIterator iter, String keyPrefix) throws IOException { - RecoveredContainerState rcs = new RecoveredContainerState(); + + @Override + public RecoveryIterator getContainerStateIterator() + throws IOException { + return new ContainerStateIterator(); + } + + private RecoveredContainerState loadContainerState(LeveldbIterator iter, + String keyPrefix) throws IOException { + ContainerId containerId = ContainerId.fromString( + keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(), + keyPrefix.length()-1)); + RecoveredContainerState rcs = new RecoveredContainerState(containerId); rcs.status = RecoveredContainerStatus.REQUESTED; while (iter.hasNext()) { Entry entry = iter.peekNext(); @@ -680,35 +733,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } + // Application Recovery Iterator + private class ApplicationStateIterator extends + BaseRecoveryIterator { + ApplicationStateIterator() throws IOException { + super(APPLICATIONS_KEY_PREFIX); + } + + @Override + protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredApplication(it); + } + } + + private ContainerManagerApplicationProto getNextRecoveredApplication( + LeveldbIterator it) throws IOException { + ContainerManagerApplicationProto applicationProto = null; + try { + if (it.hasNext()) { + Entry entry = it.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) { + return null; + } + applicationProto = ContainerManagerApplicationProto.parseFrom( + entry.getValue()); + } + } catch (DBException e) { + throw new IOException(e); + } + return applicationProto; + } + @Override public RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList(); - String keyPrefix = APPLICATIONS_KEY_PREFIX; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(keyPrefix)); - while (iter.hasNext()) { - Entry entry = iter.next(); - String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { - break; - } - state.applications.add( - ContainerManagerApplicationProto.parseFrom(entry.getValue())); - } - } catch (DBException e) { - throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } - } - + state.it = new ApplicationStateIterator(); cleanupDeprecatedFinishedApps(); - return state; } @@ -752,24 +815,29 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } - @Override - public RecoveredLocalizationState loadLocalizationState() - throws IOException { - RecoveredLocalizationState state = new RecoveredLocalizationState(); + // User Resource Recovery Iterator. + private class UserResourcesIterator extends + BaseRecoveryIterator> { + UserResourcesIterator() throws IOException { + super(LOCALIZATION_PRIVATE_KEY_PREFIX); + } - LeveldbIterator iter = null; + @Override + protected Entry getNextItem( + LeveldbIterator it) throws IOException { + return getNextRecoveredPrivateLocalizationEntry(it); + } + } + + private Entry getNextRecoveredPrivateLocalizationEntry( + LeveldbIterator it) throws IOException { + Entry localEntry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX)); - state.publicTrackerState = loadResourceTrackerState(iter, - LOCALIZATION_PUBLIC_KEY_PREFIX); - - iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + if (it.hasNext()) { + Entry entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) { - break; + return null; } int userEndPos = key.indexOf('/', @@ -780,17 +848,24 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } String user = key.substring( LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos); - state.userResources.put(user, loadUserLocalizedResources(iter, - key.substring(0, userEndPos+1))); + RecoveredUserResources val = loadUserLocalizedResources(it, + key.substring(0, userEndPos+1)); + localEntry = new AbstractMap.SimpleEntry<>(user, val); } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return localEntry; + } + @Override + public RecoveredLocalizationState loadLocalizationState() + throws IOException { + RecoveredLocalizationState state = new RecoveredLocalizationState(); + LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); + state.publicTrackerState = loadResourceTrackerState(it, + LOCALIZATION_PUBLIC_KEY_PREFIX); + state.it = new UserResourcesIterator(); return state; } @@ -800,7 +875,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX; LocalResourceTrackerState state = new LocalResourceTrackerState(); while (iter.hasNext()) { - Entry entry = iter.peekNext(); + Entry entry = iter.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { break; @@ -981,32 +1056,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { + LOCALIZATION_APPCACHE_SUFFIX + appId + "/"; } + // Deletion State Recovery Iterator. + private class DeletionStateIterator extends + BaseRecoveryIterator { + DeletionStateIterator() throws IOException { + super(DELETION_TASK_KEY_PREFIX); + } + + @Override + protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredDeletionService(it); + } + } + + private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService( + LeveldbIterator it) throws IOException { + DeletionServiceDeleteTaskProto deleteProto = null; + try { + if (it.hasNext()) { + Entry entry = it.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { + return null; + } + deleteProto = DeletionServiceDeleteTaskProto.parseFrom( + entry.getValue()); + } + } catch (DBException e) { + throw new IOException(e); + } + return deleteProto; + } @Override public RecoveredDeletionServiceState loadDeletionServiceState() throws IOException { RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); - state.tasks = new ArrayList(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(DELETION_TASK_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.next(); - String key = asString(entry.getKey()); - if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { - break; - } - state.tasks.add( - DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); - } - } catch (DBException e) { - throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } - } + state.it = new DeletionStateIterator(); return state; } @@ -1033,29 +1120,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } + private MasterKey getMasterKey(String dbKey) throws IOException { + try{ + byte[] data = db.get(bytes(dbKey)); + if (data == null || data.length == 0) { + return null; + } + return parseMasterKey(data); + } catch (DBException e) { + throw new IOException(e); + } + } - @Override - public RecoveredNMTokensState loadNMTokensState() throws IOException { - RecoveredNMTokensState state = new RecoveredNMTokensState(); - state.applicationMasterKeys = - new HashMap(); - LeveldbIterator iter = null; + // Recover NMTokens Iterator + private class NMTokensStateIterator extends + BaseRecoveryIterator> { + NMTokensStateIterator() throws IOException { + super(NM_TOKENS_KEY_PREFIX); + } + + @Override + protected Entry getNextItem( + LeveldbIterator it) throws IOException { + return getNextMasterKeyEntry(it); + } + } + + private Entry getNextMasterKeyEntry( + LeveldbIterator it) throws IOException { + Entry masterKeyentry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(NM_TOKENS_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.next(); + while (it.hasNext()) { + Entry entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) { break; } String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length()); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith( - ApplicationAttemptId.appAttemptIdStrPrefix)) { + if (key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { ApplicationAttemptId attempt; try { attempt = ApplicationAttemptId.fromString(key); @@ -1063,17 +1165,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { throw new IOException("Bad application master key state for " + fullKey, e); } - state.applicationMasterKeys.put(attempt, + masterKeyentry = new AbstractMap.SimpleEntry<>(attempt, parseMasterKey(entry.getValue())); + break; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return masterKeyentry; + } + + @Override + public RecoveredNMTokensState loadNMTokensState() throws IOException { + RecoveredNMTokensState state = new RecoveredNMTokensState(); + state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new NMTokensStateIterator(); return state; } @@ -1122,45 +1232,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } + // Recover ContainersToken Iterator. + private class ContainerTokensStateIterator extends + BaseRecoveryIterator> { + ContainerTokensStateIterator() throws IOException { + super(CONTAINER_TOKENS_KEY_PREFIX); + } - @Override - public RecoveredContainerTokensState loadContainerTokensState() + @Override + protected Entry getNextItem(LeveldbIterator it) + throws IOException { + return getNextContainerToken(it); + } + } + + private Entry getNextContainerToken(LeveldbIterator it) throws IOException { - RecoveredContainerTokensState state = new RecoveredContainerTokensState(); - state.activeTokens = new HashMap(); - LeveldbIterator iter = null; + Entry containerTokenEntry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX)); - final int containerTokensKeyPrefixLength = - CONTAINER_TOKENS_KEY_PREFIX.length(); - while (iter.hasNext()) { - Entry entry = iter.next(); + while (it.hasNext()) { + Entry entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) { break; } - String key = fullKey.substring(containerTokensKeyPrefixLength); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { - loadContainerToken(state, fullKey, key, entry.getValue()); + String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length()); + if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + containerTokenEntry = loadContainerToken(fullKey, key, + entry.getValue()); + break; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } - return state; + return containerTokenEntry; } - private static void loadContainerToken(RecoveredContainerTokensState state, - String key, String containerIdStr, byte[] value) throws IOException { + private static Entry loadContainerToken(String key, + String containerIdStr, byte[] value) throws IOException { ContainerId containerId; Long expTime; try { @@ -1169,7 +1279,19 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } catch (IllegalArgumentException e) { throw new IOException("Bad container token state for " + key, e); } - state.activeTokens.put(containerId, expTime); + return new AbstractMap.SimpleEntry<>(containerId, expTime); + } + + @Override + public RecoveredContainerTokensState loadContainerTokensState() + throws IOException { + RecoveredContainerTokensState state = new RecoveredContainerTokensState(); + state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new ContainerTokensStateIterator(); + return state; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index dfad9cfee33..3ae00f72a94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -65,7 +65,7 @@ public class NMNullStateStoreService extends NMStateStoreService { } @Override - public List loadContainersState() + public RecoveryIterator getContainerStateIterator() throws IOException { throw new UnsupportedOperationException( "Recovery not supported by this state store"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 70decdba743..35caec9a479 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -67,12 +68,11 @@ public abstract class NMStateStoreService extends AbstractService { } public static class RecoveredApplicationsState { - List applications; + RecoveryIterator it = null; - public List getApplications() { - return applications; + public RecoveryIterator getIterator() { + return it; } - } /** @@ -106,6 +106,15 @@ public abstract class NMStateStoreService extends AbstractService { RecoveredContainerType.RECOVER; private long startTime; private ResourceMappings resMappings = new ResourceMappings(); + private final ContainerId containerId; + + RecoveredContainerState(ContainerId containerId){ + this.containerId = containerId; + } + + public ContainerId getContainerId() { + return containerId; + } public RecoveredContainerStatus getStatus() { return status; @@ -248,30 +257,33 @@ public abstract class NMStateStoreService extends AbstractService { public static class RecoveredLocalizationState { LocalResourceTrackerState publicTrackerState = new LocalResourceTrackerState(); - Map userResources = - new HashMap(); + RecoveryIterator> it = null; public LocalResourceTrackerState getPublicTrackerState() { return publicTrackerState; } - public Map getUserResources() { - return userResources; + public RecoveryIterator> getIterator() { + return it; } } public static class RecoveredDeletionServiceState { - List tasks; + RecoveryIterator it = null; - public List getTasks() { - return tasks; + public RecoveryIterator getIterator(){ + return it; } } public static class RecoveredNMTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map applicationMasterKeys; + RecoveryIterator> it = null; + + public RecoveryIterator> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -281,15 +293,16 @@ public abstract class NMStateStoreService extends AbstractService { return previousMasterKey; } - public Map getApplicationMasterKeys() { - return applicationMasterKeys; - } } public static class RecoveredContainerTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map activeTokens; + RecoveryIterator> it = null; + + public RecoveryIterator> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -299,9 +312,6 @@ public abstract class NMStateStoreService extends AbstractService { return previousMasterKey; } - public Map getActiveTokens() { - return activeTokens; - } } public static class RecoveredLogDeleterState { @@ -400,11 +410,10 @@ public abstract class NMStateStoreService extends AbstractService { /** - * Load the state of containers - * @return recovered state for containers - * @throws IOException + * get the Recovered Container State Iterator + * @return recovery iterator */ - public abstract List loadContainersState() + public abstract RecoveryIterator getContainerStateIterator() throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java new file mode 100644 index 00000000000..0bb262a6b53 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.io.Closeable; +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * A wrapper for a Iterator to translate the raw RuntimeExceptions that + * can be thrown into IOException. + */ +public interface RecoveryIterator extends Closeable { + + /** + * Returns true if the iteration has more elements. + */ + boolean hasNext() throws IOException; + + /** + * Returns the next element in the iteration. + */ + T next() throws IOException, NoSuchElementException; + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java index 256f649cabf..b3df69b5554 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java @@ -24,6 +24,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,17 +92,20 @@ public class NMContainerTokenSecretManager extends super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Entry entry : state.getActiveTokens().entrySet()) { - ContainerId containerId = entry.getKey(); - Long expTime = entry.getValue(); - List containerList = - recentlyStartedContainerTracker.get(expTime); - if (containerList == null) { - containerList = new ArrayList(); - recentlyStartedContainerTracker.put(expTime, containerList); - } - if (!containerList.contains(containerId)) { - containerList.add(containerId); + try (RecoveryIterator> it = state.getIterator()) { + while (it.hasNext()) { + Entry entry = it.next(); + ContainerId containerId = entry.getKey(); + Long expTime = entry.getValue(); + List containerList = + recentlyStartedContainerTracker.get(expTime); + if (containerList == null) { + containerList = new ArrayList(); + recentlyStartedContainerTracker.put(expTime, containerList); + } + if (!containerList.contains(containerId)) { + containerList.add(containerId); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java index 0956e77f7fa..f8957915677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,11 +89,14 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager { super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Map.Entry entry : - state.getApplicationMasterKeys().entrySet()) { - key = entry.getValue(); - oldMasterKeys.put(entry.getKey(), - new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + try (RecoveryIterator> it = + state.getIterator()) { + while (it.hasNext()) { + Map.Entry entry = it.next(); + key = entry.getValue(); + oldMasterKeys.put(entry.getKey(), + new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + } } // reconstruct app to app attempts map diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index c5428d184b7..9658ecdf635 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,6 +57,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { private Map deleteTasks; private RecoveredNMTokensState nmTokenState; private RecoveredContainerTokensState containerTokenState; + private Map applicationMasterKeys; + private Map activeTokens; private Map logDeleterState; private RecoveredAMRMProxyState amrmProxyState; @@ -68,10 +71,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService { apps = new HashMap(); containerStates = new HashMap(); nmTokenState = new RecoveredNMTokensState(); - nmTokenState.applicationMasterKeys = - new HashMap(); + applicationMasterKeys = new HashMap(); containerTokenState = new RecoveredContainerTokensState(); - containerTokenState.activeTokens = new HashMap(); + activeTokens = new HashMap(); trackerStates = new HashMap(); deleteTasks = new HashMap(); logDeleterState = new HashMap(); @@ -86,13 +88,39 @@ public class NMMemoryStateStoreService extends NMStateStoreService { protected void closeStorage() { } + // Recovery Iterator Implementation. + private class NMMemoryRecoveryIterator implements RecoveryIterator { + + private Iterator it; + + NMMemoryRecoveryIterator(Iterator it){ + this.it = it; + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() throws IOException { + return it.next(); + } + + @Override + public void close() throws IOException { + + } + } @Override public synchronized RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList( - apps.values()); + List containerList = + new ArrayList(apps.values()); + state.it = new NMMemoryRecoveryIterator( + containerList.iterator()); return state; } @@ -111,13 +139,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public synchronized List loadContainersState() + public RecoveryIterator getContainerStateIterator() throws IOException { // return a copy so caller can't modify our state List result = new ArrayList(containerStates.size()); for (RecoveredContainerState rcs : containerStates.values()) { - RecoveredContainerState rcsCopy = new RecoveredContainerState(); + RecoveredContainerState rcsCopy = new RecoveredContainerState(rcs.getContainerId()); rcsCopy.status = rcs.status; rcsCopy.exitCode = rcs.exitCode; rcsCopy.killed = rcs.killed; @@ -131,13 +159,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService { rcsCopy.setResourceMappings(rcs.getResourceMappings()); result.add(rcsCopy); } - return result; + return new NMMemoryRecoveryIterator( + result.iterator()); } @Override public synchronized void storeContainer(ContainerId containerId, int version, long startTime, StartContainerRequest startRequest) { - RecoveredContainerState rcs = new RecoveredContainerState(); + RecoveredContainerState rcs = new RecoveredContainerState(containerId); rcs.startRequest = startRequest; rcs.version = version; try { @@ -284,6 +313,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized RecoveredLocalizationState loadLocalizationState() { RecoveredLocalizationState result = new RecoveredLocalizationState(); + Map userResources = + new HashMap(); for (Map.Entry e : trackerStates.entrySet()) { TrackerKey tk = e.getKey(); TrackerState ts = e.getValue(); @@ -294,10 +325,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { if (tk.user == null) { result.publicTrackerState = loadTrackerState(ts); } else { - RecoveredUserResources rur = result.userResources.get(tk.user); + RecoveredUserResources rur = userResources.get(tk.user); if (rur == null) { rur = new RecoveredUserResources(); - result.userResources.put(tk.user, rur); + userResources.put(tk.user, rur); } if (tk.appId == null) { rur.privateTrackerState = loadTrackerState(ts); @@ -306,6 +337,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } } } + result.it = new NMMemoryRecoveryIterator>( + userResources.entrySet().iterator()); return result; } @@ -341,8 +374,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { throws IOException { RecoveredDeletionServiceState result = new RecoveredDeletionServiceState(); - result.tasks = new ArrayList( - deleteTasks.values()); + List deleteTaskProtos = + new ArrayList(deleteTasks.values()); + result.it = new NMMemoryRecoveryIterator( + deleteTaskProtos.iterator()); return result; } @@ -365,9 +400,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { RecoveredNMTokensState result = new RecoveredNMTokensState(); result.currentMasterKey = nmTokenState.currentMasterKey; result.previousMasterKey = nmTokenState.previousMasterKey; - result.applicationMasterKeys = - new HashMap( - nmTokenState.applicationMasterKeys); + Map masterKeysMap = + new HashMap(applicationMasterKeys); + result.it = new NMMemoryRecoveryIterator>( + masterKeysMap.entrySet().iterator()); return result; } @@ -389,14 +425,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService { public synchronized void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; - nmTokenState.applicationMasterKeys.put(attempt, + applicationMasterKeys.put(attempt, new MasterKeyPBImpl(keypb.getProto())); } @Override public synchronized void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException { - nmTokenState.applicationMasterKeys.remove(attempt); + applicationMasterKeys.remove(attempt); } @@ -408,8 +444,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { new RecoveredContainerTokensState(); result.currentMasterKey = containerTokenState.currentMasterKey; result.previousMasterKey = containerTokenState.previousMasterKey; - result.activeTokens = - new HashMap(containerTokenState.activeTokens); + Map containersTokenMap = + new HashMap(activeTokens); + result.it = new NMMemoryRecoveryIterator>( + containersTokenMap.entrySet().iterator()); return result; } @@ -432,13 +470,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException { - containerTokenState.activeTokens.put(containerId, expirationTime); + activeTokens.put(containerId, expirationTime); } @Override public synchronized void removeContainerToken(ContainerId containerId) throws IOException { - containerTokenState.activeTokens.remove(containerId); + activeTokens.remove(containerId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 8a8cfa2644e..fcbbc52a3c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -125,6 +125,73 @@ public class TestNMLeveldbStateStoreService { FileUtil.fullyDelete(TMP_DIR); } + private List loadContainersState( + RecoveryIterator it) throws IOException { + List containers = + new ArrayList(); + while (it.hasNext()) { + RecoveredContainerState rcs = it.next(); + containers.add(rcs); + } + return containers; + } + + private List loadApplicationProtos( + RecoveryIterator it) + throws IOException { + List applicationProtos = + new ArrayList(); + while (it.hasNext()) { + applicationProtos.add(it.next()); + } + return applicationProtos; + } + + private List loadDeletionTaskProtos( + RecoveryIterator it) throws IOException { + List deleteTaskProtos = + new ArrayList(); + while (it.hasNext()) { + deleteTaskProtos.add(it.next()); + } + return deleteTaskProtos; + } + + private Map loadUserResources( + RecoveryIterator> it) + throws IOException { + Map userResources = + new HashMap(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + userResources.put(entry.getKey(), entry.getValue()); + } + return userResources; + } + + private Map loadNMTokens( + RecoveryIterator> it) + throws IOException { + Map nmTokens = + new HashMap(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + nmTokens.put(entry.getKey(), entry.getValue()); + } + return nmTokens; + } + + private Map loadContainerTokens( + RecoveryIterator> it) throws IOException { + Map containerTokens = + new HashMap(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + containerTokens.put(entry.getKey(), entry.getValue()); + } + return containerTokens; + } + private void restartStateStore() throws IOException { // need to close so leveldb releases database lock if (stateStore != null) { @@ -142,7 +209,7 @@ public class TestNMLeveldbStateStoreService { assertNotNull(pubts); assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); - assertTrue(state.getUserResources().isEmpty()); + assertTrue(loadUserResources(state.getIterator()).isEmpty()); } @Test @@ -183,7 +250,7 @@ public class TestNMLeveldbStateStoreService { restartStateStore(); Assert.fail("Incompatible version, should expect fail here."); } catch (ServiceStateException e) { - Assert.assertTrue("Exception message mismatch", + Assert.assertTrue("Exception message mismatch", e.getMessage().contains("Incompatible version for NM state:")); } } @@ -192,7 +259,9 @@ public class TestNMLeveldbStateStoreService { public void testApplicationStorage() throws IOException { // test empty when no state RecoveredApplicationsState state = stateStore.loadApplicationsState(); - assertTrue(state.getApplications().isEmpty()); + List apps = + loadApplicationProtos(state.getIterator()); + assertTrue(apps.isEmpty()); // store an application and verify recovered final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); @@ -204,8 +273,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeApplication(appId1, appProto1); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); // add a new app final ApplicationId appId2 = ApplicationId.newInstance(1234, 2); @@ -216,23 +286,25 @@ public class TestNMLeveldbStateStoreService { stateStore.storeApplication(appId2, appProto2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(2, state.getApplications().size()); - assertTrue(state.getApplications().contains(appProto1)); - assertTrue(state.getApplications().contains(appProto2)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(2, apps.size()); + assertTrue(apps.contains(appProto1)); + assertTrue(apps.contains(appProto2)); // test removing an application stateStore.removeApplication(appId2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); } @Test public void testContainerStorage() throws IOException { // test empty when no state List recoveredContainers = - stateStore.loadContainersState(); + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // create a container request @@ -254,7 +326,8 @@ public class TestNMLeveldbStateStoreService { stateStore.getContainerVersionKey(containerId.toString())))); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -269,14 +342,16 @@ public class TestNMLeveldbStateStoreService { // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); stateStore.storeContainerLaunched(containerId1); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); // queue the container, and verify recovered stateStore.storeContainerQueued(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); @@ -292,7 +367,8 @@ public class TestNMLeveldbStateStoreService { diags.append("some diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -305,7 +381,8 @@ public class TestNMLeveldbStateStoreService { // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); @@ -316,7 +393,8 @@ public class TestNMLeveldbStateStoreService { // Resume the container stateStore.removeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); // increase the container size, and verify recovered @@ -328,7 +406,8 @@ public class TestNMLeveldbStateStoreService { stateStore .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -342,7 +421,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerKilled(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -358,7 +438,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerCompleted(containerId, 21); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); @@ -371,7 +452,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(6, rcs.getRemainingRetryAttempts()); @@ -382,12 +464,13 @@ public class TestNMLeveldbStateStoreService { // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // recover again to check remove clears all containers restartStateStore(); NMStateStoreService nmStoreSpy = spy(stateStore); - nmStoreSpy.loadContainersState(); + loadContainersState(nmStoreSpy.getContainerStateIterator()); verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class)); } @@ -399,7 +482,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerRestartTimes(containerId, finishTimeForRetryAttempts); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = + loadContainersState(stateStore.getContainerStateIterator()).get(0); List recoveredRestartTimes = rcs.getRestartTimes(); assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0)); assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1)); @@ -481,7 +565,7 @@ public class TestNMLeveldbStateStoreService { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -535,7 +619,7 @@ public class TestNMLeveldbStateStoreService { pubts.getInProgressResources().get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -584,7 +668,7 @@ public class TestNMLeveldbStateStoreService { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -654,7 +738,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(1, pubts.getInProgressResources().size()); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -762,7 +846,7 @@ public class TestNMLeveldbStateStoreService { assertEquals(pubLocalizedProto1, pubts.getLocalizedResources().iterator().next()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty()); } @@ -771,7 +855,9 @@ public class TestNMLeveldbStateStoreService { // test empty when no state RecoveredDeletionServiceState state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); + List deleteTaskProtos = + loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); // store a deletion task and verify recovered DeletionServiceDeleteTaskProto proto = @@ -788,8 +874,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeDeletionTask(proto.getId(), proto); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // store another deletion task DeletionServiceDeleteTaskProto proto2 = @@ -802,31 +889,36 @@ public class TestNMLeveldbStateStoreService { stateStore.storeDeletionTask(proto2.getId(), proto2); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(2, state.getTasks().size()); - assertTrue(state.getTasks().contains(proto)); - assertTrue(state.getTasks().contains(proto2)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(2, deleteTaskProtos.size()); + assertTrue(deleteTaskProtos.contains(proto)); + assertTrue(deleteTaskProtos.contains(proto2)); + // delete a task and verify gone after recovery stateStore.removeDeletionTask(proto2.getId()); restartStateStore(); - state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + state = stateStore.loadDeletionServiceState(); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // delete the last task and verify none left stateStore.removeDeletionTask(proto.getId()); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); - } + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); } @Test public void testNMTokenStorage() throws IOException { // test empty when no state RecoveredNMTokensState state = stateStore.loadNMTokensState(); + Map loadedAppKeys = + loadNMTokens(state.getIterator()); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a master key and verify recovered NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest(); @@ -834,18 +926,20 @@ public class TestNMLeveldbStateStoreService { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = secretMgr.generateKey(); stateStore.storeNMTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a few application keys and verify recovered ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance( @@ -858,10 +952,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map loadedAppKeys = - state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertEquals(attemptKey1, loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -880,9 +973,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedAppKeys = state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertNull(loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -894,9 +987,10 @@ public class TestNMLeveldbStateStoreService { // test empty when no state RecoveredContainerTokensState state = stateStore.loadContainerTokensState(); + Map loadedActiveTokens = loadContainerTokens(state.it); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a master key and verify recovered ContainerTokenKeyGeneratorForTest keygen = @@ -905,18 +999,20 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = keygen.generateKey(); stateStore.storeContainerTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a few container tokens and verify recovered ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1); @@ -927,10 +1023,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerToken(cid2, expTime2); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map loadedActiveTokens = - state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertEquals(expTime1, loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -948,9 +1043,9 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedActiveTokens = state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertNull(loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -1029,8 +1124,8 @@ public class TestNMLeveldbStateStoreService { @Test public void testUnexpectedKeyDoesntThrowException() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1045,7 +1140,8 @@ public class TestNMLeveldbStateStoreService { + containerId.toString() + "/invalidKey1234").getBytes(); stateStore.getDB().put(invalidKey, new byte[1]); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); @@ -1162,8 +1258,8 @@ public class TestNMLeveldbStateStoreService { @Test public void testStateStoreForResourceMapping() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1190,7 +1286,8 @@ public class TestNMLeveldbStateStoreService { // add a invalid key restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); List res = rcs.getResourceMappings() @@ -1253,7 +1350,8 @@ public class TestNMLeveldbStateStoreService { stateStore.storeContainerRestartTimes(containerId, restartTimes); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = + loadContainersState(stateStore.getContainerStateIterator()).get(0); List recoveredRestartTimes = rcs.getRestartTimes(); assertTrue(recoveredRestartTimes.isEmpty()); }