YARN-8242. YARN NM: OOM error while reading back the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet Sachdev

(cherry picked from commit 65e7469712)
This commit is contained in:
Jason Lowe 2018-08-20 10:14:40 -05:00
parent d7442c244f
commit 44c4928b64
11 changed files with 657 additions and 320 deletions

View File

@ -19,13 +19,14 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static java.util.concurrent.TimeUnit.SECONDS;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -96,16 +97,20 @@ public class DeletionService extends AbstractService {
private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
throws IOException {
List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
new HashMap<>(taskProtos.size());
Set<Integer> successorTasks = new HashSet<>();
for (DeletionServiceDeleteTaskProto proto : taskProtos) {
DeletionTaskRecoveryInfo info =
NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
idToInfoMap.put(info.getTask().getTaskId(), info);
nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
successorTasks.addAll(info.getSuccessorTaskIds());
new HashMap<Integer, DeletionTaskRecoveryInfo>();
Set<Integer> successorTasks = new HashSet<Integer>();
try (RecoveryIterator<DeletionServiceDeleteTaskProto> it =
state.getIterator()) {
while (it.hasNext()) {
DeletionServiceDeleteTaskProto proto = it.next();
DeletionTaskRecoveryInfo info =
NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
idToInfoMap.put(info.getTask().getTaskId(), info);
nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
successorTasks.addAll(info.getSuccessorTaskIds());
}
}
// restore the task dependencies and schedule the deletion tasks that

View File

@ -23,6 +23,7 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -356,19 +357,26 @@ public class ContainerManagerImpl extends CompositeService implements
stateStore.loadLocalizationState());
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
for (ContainerManagerApplicationProto proto :
appsState.getApplications()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering application with state: " + proto.toString());
try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
appsState.getIterator()) {
while (rasIterator.hasNext()) {
ContainerManagerApplicationProto proto = rasIterator.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering application with state: " + proto.toString());
}
recoverApplication(proto);
}
recoverApplication(proto);
}
for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering container with state: " + rcs);
try (RecoveryIterator<RecoveredContainerState> rcsIterator =
stateStore.getContainerStateIterator()) {
while (rcsIterator.hasNext()) {
RecoveredContainerState rcs = rcsIterator.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering container with state: " + rcs);
}
recoverContainer(rcs);
}
recoverContainer(rcs);
}
// Recovery AMRMProxy state after apps and containers are recovered

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -298,42 +300,46 @@ public class ResourceLocalizationService extends CompositeService
//Recover localized resources after an NM restart
public void recoverLocalizedResources(RecoveredLocalizationState state)
throws URISyntaxException {
throws URISyntaxException, IOException {
LocalResourceTrackerState trackerState = state.getPublicTrackerState();
recoverTrackerResources(publicRsrc, trackerState);
for (Map.Entry<String, RecoveredUserResources> userEntry :
state.getUserResources().entrySet()) {
String user = userEntry.getKey();
RecoveredUserResources userResources = userEntry.getValue();
trackerState = userResources.getPrivateTrackerState();
if (!trackerState.isEmpty()) {
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
null, dispatcher, true, super.getConfig(), stateStore, dirsHandler);
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
tracker);
if (oldTracker != null) {
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
}
for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
userResources.getAppTrackerStates().entrySet()) {
trackerState = appEntry.getValue();
try (RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it
= state.getIterator()) {
while (it.hasNext()) {
Map.Entry<String, RecoveredUserResources> userEntry = it.next();
String user = userEntry.getKey();
RecoveredUserResources userResources = userEntry.getValue();
trackerState = userResources.getPrivateTrackerState();
if (!trackerState.isEmpty()) {
ApplicationId appId = appEntry.getKey();
String appIdStr = appId.toString();
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, false, super.getConfig(), stateStore,
null, dispatcher, true, super.getConfig(), stateStore,
dirsHandler);
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
tracker);
if (oldTracker != null) {
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
}
for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
userResources.getAppTrackerStates().entrySet()) {
trackerState = appEntry.getValue();
if (!trackerState.isEmpty()) {
ApplicationId appId = appEntry.getKey();
String appIdStr = appId.toString();
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
appId, dispatcher, false, super.getConfig(), stateStore,
dirsHandler);
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
tracker);
if (oldTracker != null) {
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
}
}
}
}
}
@ -559,7 +565,7 @@ public class ResourceLocalizationService extends CompositeService
rsrcCleanup.getResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) {

View File

@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -73,6 +74,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@ -225,68 +227,119 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
return isHealthy;
}
@Override
public List<RecoveredContainerState> loadContainersState()
// LeveldbIterator starting at startkey
private LeveldbIterator getLevelDBIterator(String startKey)
throws IOException {
ArrayList<RecoveredContainerState> containers =
new ArrayList<RecoveredContainerState>();
ArrayList<ContainerId> containersToRemove =
new ArrayList<ContainerId>();
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(CONTAINERS_KEY_PREFIX));
LeveldbIterator it = new LeveldbIterator(db);
it.seek(bytes(startKey));
return it;
} catch (DBException e) {
throw new IOException(e);
}
}
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.peekNext();
// Base Recovery Iterator
private abstract class BaseRecoveryIterator<T> implements
RecoveryIterator<T> {
LeveldbIterator it;
T nextItem;
BaseRecoveryIterator(String dbKey) throws IOException {
this.it = getLevelDBIterator(dbKey);
this.nextItem = null;
}
protected abstract T getNextItem(LeveldbIterator it) throws IOException;
@Override
public boolean hasNext() throws IOException {
if (nextItem == null) {
nextItem = getNextItem(it);
}
return (nextItem != null);
}
@Override
public T next() throws IOException, NoSuchElementException {
T tmp = nextItem;
if (tmp != null) {
nextItem = null;
return tmp;
} else {
tmp = getNextItem(it);
if (tmp == null) {
throw new NoSuchElementException();
}
return tmp;
}
}
@Override
public void close() throws IOException {
if (it != null) {
it.close();
}
}
}
// Container Recovery Iterator
private class ContainerStateIterator extends
BaseRecoveryIterator<RecoveredContainerState> {
ContainerStateIterator() throws IOException {
super(CONTAINERS_KEY_PREFIX);
}
@Override
protected RecoveredContainerState getNextItem(LeveldbIterator it)
throws IOException {
return getNextRecoveredContainer(it);
}
}
private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it)
throws IOException {
RecoveredContainerState rcs = null;
try {
while (it.hasNext()) {
Entry<byte[], byte[]> entry = it.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
break;
return null;
}
int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
if (idEndPos < 0) {
throw new IOException("Unable to determine container in key: " + key);
}
ContainerId containerId = ContainerId.fromString(
key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
String keyPrefix = key.substring(0, idEndPos+1);
RecoveredContainerState rcs = loadContainerState(containerId,
iter, keyPrefix);
// Don't load container without StartContainerRequest
String keyPrefix = key.substring(0, idEndPos + 1);
rcs = loadContainerState(it, keyPrefix);
if (rcs.startRequest != null) {
containers.add(rcs);
break;
} else {
containersToRemove.add(containerId);
removeContainer(rcs.getContainerId());
rcs = null;
}
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
// remove container without StartContainerRequest
for (ContainerId containerId : containersToRemove) {
LOG.warn("Remove container " + containerId +
" with incomplete records");
try {
removeContainer(containerId);
// TODO: kill and cleanup the leaked container
} catch (IOException e) {
LOG.error("Unable to remove container " + containerId +
" in store", e);
}
}
return containers;
return rcs;
}
private RecoveredContainerState loadContainerState(ContainerId containerId,
LeveldbIterator iter, String keyPrefix) throws IOException {
RecoveredContainerState rcs = new RecoveredContainerState();
@Override
public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
throws IOException {
return new ContainerStateIterator();
}
private RecoveredContainerState loadContainerState(LeveldbIterator iter,
String keyPrefix) throws IOException {
ContainerId containerId = ContainerId.fromString(
keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(),
keyPrefix.length()-1));
RecoveredContainerState rcs = new RecoveredContainerState(containerId);
rcs.status = RecoveredContainerStatus.REQUESTED;
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
@ -680,35 +733,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
// Application Recovery Iterator
private class ApplicationStateIterator extends
BaseRecoveryIterator<ContainerManagerApplicationProto> {
ApplicationStateIterator() throws IOException {
super(APPLICATIONS_KEY_PREFIX);
}
@Override
protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it)
throws IOException {
return getNextRecoveredApplication(it);
}
}
private ContainerManagerApplicationProto getNextRecoveredApplication(
LeveldbIterator it) throws IOException {
ContainerManagerApplicationProto applicationProto = null;
try {
if (it.hasNext()) {
Entry<byte[], byte[]> entry = it.next();
String key = asString(entry.getKey());
if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) {
return null;
}
applicationProto = ContainerManagerApplicationProto.parseFrom(
entry.getValue());
}
} catch (DBException e) {
throw new IOException(e);
}
return applicationProto;
}
@Override
public RecoveredApplicationsState loadApplicationsState()
throws IOException {
RecoveredApplicationsState state = new RecoveredApplicationsState();
state.applications = new ArrayList<ContainerManagerApplicationProto>();
String keyPrefix = APPLICATIONS_KEY_PREFIX;
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(keyPrefix));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
}
state.applications.add(
ContainerManagerApplicationProto.parseFrom(entry.getValue()));
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
state.it = new ApplicationStateIterator();
cleanupDeprecatedFinishedApps();
return state;
}
@ -752,24 +815,29 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
@Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState();
// User Resource Recovery Iterator.
private class UserResourcesIterator extends
BaseRecoveryIterator<Entry<String, RecoveredUserResources>> {
UserResourcesIterator() throws IOException {
super(LOCALIZATION_PRIVATE_KEY_PREFIX);
}
LeveldbIterator iter = null;
@Override
protected Entry<String, RecoveredUserResources> getNextItem(
LeveldbIterator it) throws IOException {
return getNextRecoveredPrivateLocalizationEntry(it);
}
}
private Entry<String, RecoveredUserResources> getNextRecoveredPrivateLocalizationEntry(
LeveldbIterator it) throws IOException {
Entry<String, RecoveredUserResources> localEntry = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
state.publicTrackerState = loadResourceTrackerState(iter,
LOCALIZATION_PUBLIC_KEY_PREFIX);
iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX));
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
if (it.hasNext()) {
Entry<byte[], byte[]> entry = it.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) {
break;
return null;
}
int userEndPos = key.indexOf('/',
@ -780,17 +848,24 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
String user = key.substring(
LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos);
state.userResources.put(user, loadUserLocalizedResources(iter,
key.substring(0, userEndPos+1)));
RecoveredUserResources val = loadUserLocalizedResources(it,
key.substring(0, userEndPos+1));
localEntry = new AbstractMap.SimpleEntry<>(user, val);
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
return localEntry;
}
@Override
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState();
LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX);
state.publicTrackerState = loadResourceTrackerState(it,
LOCALIZATION_PUBLIC_KEY_PREFIX);
state.it = new UserResourcesIterator();
return state;
}
@ -800,7 +875,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
LocalResourceTrackerState state = new LocalResourceTrackerState();
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
@ -981,32 +1056,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
+ LOCALIZATION_APPCACHE_SUFFIX + appId + "/";
}
// Deletion State Recovery Iterator.
private class DeletionStateIterator extends
BaseRecoveryIterator<DeletionServiceDeleteTaskProto> {
DeletionStateIterator() throws IOException {
super(DELETION_TASK_KEY_PREFIX);
}
@Override
protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it)
throws IOException {
return getNextRecoveredDeletionService(it);
}
}
private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService(
LeveldbIterator it) throws IOException {
DeletionServiceDeleteTaskProto deleteProto = null;
try {
if (it.hasNext()) {
Entry<byte[], byte[]> entry = it.next();
String key = asString(entry.getKey());
if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
return null;
}
deleteProto = DeletionServiceDeleteTaskProto.parseFrom(
entry.getValue());
}
} catch (DBException e) {
throw new IOException(e);
}
return deleteProto;
}
@Override
public RecoveredDeletionServiceState loadDeletionServiceState()
throws IOException {
RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
state.tasks = new ArrayList<DeletionServiceDeleteTaskProto>();
LeveldbIterator iter = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(DELETION_TASK_KEY_PREFIX));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
break;
}
state.tasks.add(
DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
state.it = new DeletionStateIterator();
return state;
}
@ -1033,29 +1120,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
private MasterKey getMasterKey(String dbKey) throws IOException {
try{
byte[] data = db.get(bytes(dbKey));
if (data == null || data.length == 0) {
return null;
}
return parseMasterKey(data);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public RecoveredNMTokensState loadNMTokensState() throws IOException {
RecoveredNMTokensState state = new RecoveredNMTokensState();
state.applicationMasterKeys =
new HashMap<ApplicationAttemptId, MasterKey>();
LeveldbIterator iter = null;
// Recover NMTokens Iterator
private class NMTokensStateIterator extends
BaseRecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> {
NMTokensStateIterator() throws IOException {
super(NM_TOKENS_KEY_PREFIX);
}
@Override
protected Entry<ApplicationAttemptId, MasterKey> getNextItem(
LeveldbIterator it) throws IOException {
return getNextMasterKeyEntry(it);
}
}
private Entry<ApplicationAttemptId, MasterKey> getNextMasterKeyEntry(
LeveldbIterator it) throws IOException {
Entry<ApplicationAttemptId, MasterKey> masterKeyentry = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(NM_TOKENS_KEY_PREFIX));
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
while (it.hasNext()) {
Entry<byte[], byte[]> entry = it.next();
String fullKey = asString(entry.getKey());
if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) {
break;
}
String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length());
if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
state.currentMasterKey = parseMasterKey(entry.getValue());
} else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
state.previousMasterKey = parseMasterKey(entry.getValue());
} else if (key.startsWith(
ApplicationAttemptId.appAttemptIdStrPrefix)) {
if (key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
ApplicationAttemptId attempt;
try {
attempt = ApplicationAttemptId.fromString(key);
@ -1063,17 +1165,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
throw new IOException("Bad application master key state for "
+ fullKey, e);
}
state.applicationMasterKeys.put(attempt,
masterKeyentry = new AbstractMap.SimpleEntry<>(attempt,
parseMasterKey(entry.getValue()));
break;
}
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
return masterKeyentry;
}
@Override
public RecoveredNMTokensState loadNMTokensState() throws IOException {
RecoveredNMTokensState state = new RecoveredNMTokensState();
state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
+ CURRENT_MASTER_KEY_SUFFIX);
state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
+ PREV_MASTER_KEY_SUFFIX);
state.it = new NMTokensStateIterator();
return state;
}
@ -1122,45 +1232,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
// Recover ContainersToken Iterator.
private class ContainerTokensStateIterator extends
BaseRecoveryIterator<Entry<ContainerId, Long>> {
ContainerTokensStateIterator() throws IOException {
super(CONTAINER_TOKENS_KEY_PREFIX);
}
@Override
public RecoveredContainerTokensState loadContainerTokensState()
@Override
protected Entry<ContainerId, Long> getNextItem(LeveldbIterator it)
throws IOException {
return getNextContainerToken(it);
}
}
private Entry<ContainerId, Long> getNextContainerToken(LeveldbIterator it)
throws IOException {
RecoveredContainerTokensState state = new RecoveredContainerTokensState();
state.activeTokens = new HashMap<ContainerId, Long>();
LeveldbIterator iter = null;
Entry<ContainerId, Long> containerTokenEntry = null;
try {
iter = new LeveldbIterator(db);
iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
final int containerTokensKeyPrefixLength =
CONTAINER_TOKENS_KEY_PREFIX.length();
while (iter.hasNext()) {
Entry<byte[], byte[]> entry = iter.next();
while (it.hasNext()) {
Entry<byte[], byte[]> entry = it.next();
String fullKey = asString(entry.getKey());
if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
break;
}
String key = fullKey.substring(containerTokensKeyPrefixLength);
if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
state.currentMasterKey = parseMasterKey(entry.getValue());
} else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
state.previousMasterKey = parseMasterKey(entry.getValue());
} else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
loadContainerToken(state, fullKey, key, entry.getValue());
String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length());
if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
containerTokenEntry = loadContainerToken(fullKey, key,
entry.getValue());
break;
}
}
} catch (DBException e) {
throw new IOException(e);
} finally {
if (iter != null) {
iter.close();
}
}
return state;
return containerTokenEntry;
}
private static void loadContainerToken(RecoveredContainerTokensState state,
String key, String containerIdStr, byte[] value) throws IOException {
private static Entry<ContainerId, Long> loadContainerToken(String key,
String containerIdStr, byte[] value) throws IOException {
ContainerId containerId;
Long expTime;
try {
@ -1169,7 +1279,19 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} catch (IllegalArgumentException e) {
throw new IOException("Bad container token state for " + key, e);
}
state.activeTokens.put(containerId, expTime);
return new AbstractMap.SimpleEntry<>(containerId, expTime);
}
@Override
public RecoveredContainerTokensState loadContainerTokensState()
throws IOException {
RecoveredContainerTokensState state = new RecoveredContainerTokensState();
state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
+ CURRENT_MASTER_KEY_SUFFIX);
state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
+ PREV_MASTER_KEY_SUFFIX);
state.it = new ContainerTokensStateIterator();
return state;
}
@Override

View File

@ -65,7 +65,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
public List<RecoveredContainerState> loadContainersState()
public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
throws IOException {
throw new UnsupportedOperationException(
"Recovery not supported by this state store");

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -67,12 +68,11 @@ public abstract class NMStateStoreService extends AbstractService {
}
public static class RecoveredApplicationsState {
List<ContainerManagerApplicationProto> applications;
RecoveryIterator<ContainerManagerApplicationProto> it = null;
public List<ContainerManagerApplicationProto> getApplications() {
return applications;
public RecoveryIterator<ContainerManagerApplicationProto> getIterator() {
return it;
}
}
/**
@ -106,6 +106,15 @@ public abstract class NMStateStoreService extends AbstractService {
RecoveredContainerType.RECOVER;
private long startTime;
private ResourceMappings resMappings = new ResourceMappings();
private final ContainerId containerId;
RecoveredContainerState(ContainerId containerId){
this.containerId = containerId;
}
public ContainerId getContainerId() {
return containerId;
}
public RecoveredContainerStatus getStatus() {
return status;
@ -248,30 +257,33 @@ public abstract class NMStateStoreService extends AbstractService {
public static class RecoveredLocalizationState {
LocalResourceTrackerState publicTrackerState =
new LocalResourceTrackerState();
Map<String, RecoveredUserResources> userResources =
new HashMap<String, RecoveredUserResources>();
RecoveryIterator<Entry<String, RecoveredUserResources>> it = null;
public LocalResourceTrackerState getPublicTrackerState() {
return publicTrackerState;
}
public Map<String, RecoveredUserResources> getUserResources() {
return userResources;
public RecoveryIterator<Entry<String, RecoveredUserResources>> getIterator() {
return it;
}
}
public static class RecoveredDeletionServiceState {
List<DeletionServiceDeleteTaskProto> tasks;
RecoveryIterator<DeletionServiceDeleteTaskProto> it = null;
public List<DeletionServiceDeleteTaskProto> getTasks() {
return tasks;
public RecoveryIterator<DeletionServiceDeleteTaskProto> getIterator(){
return it;
}
}
public static class RecoveredNMTokensState {
MasterKey currentMasterKey;
MasterKey previousMasterKey;
Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> it = null;
public RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> getIterator() {
return it;
}
public MasterKey getCurrentMasterKey() {
return currentMasterKey;
@ -281,15 +293,16 @@ public abstract class NMStateStoreService extends AbstractService {
return previousMasterKey;
}
public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
return applicationMasterKeys;
}
}
public static class RecoveredContainerTokensState {
MasterKey currentMasterKey;
MasterKey previousMasterKey;
Map<ContainerId, Long> activeTokens;
RecoveryIterator<Entry<ContainerId, Long>> it = null;
public RecoveryIterator<Entry<ContainerId, Long>> getIterator() {
return it;
}
public MasterKey getCurrentMasterKey() {
return currentMasterKey;
@ -299,9 +312,6 @@ public abstract class NMStateStoreService extends AbstractService {
return previousMasterKey;
}
public Map<ContainerId, Long> getActiveTokens() {
return activeTokens;
}
}
public static class RecoveredLogDeleterState {
@ -400,11 +410,10 @@ public abstract class NMStateStoreService extends AbstractService {
/**
* Load the state of containers
* @return recovered state for containers
* @throws IOException
* get the Recovered Container State Iterator
* @return recovery iterator
*/
public abstract List<RecoveredContainerState> loadContainersState()
public abstract RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
throws IOException;
/**

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.Closeable;
import java.io.IOException;
import java.util.NoSuchElementException;
/**
* A wrapper for a Iterator to translate the raw RuntimeExceptions that
* can be thrown into IOException.
*/
public interface RecoveryIterator<T> extends Closeable {
/**
* Returns true if the iteration has more elements.
*/
boolean hasNext() throws IOException;
/**
* Returns the next element in the iteration.
*/
T next() throws IOException, NoSuchElementException;
}

View File

@ -24,6 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -90,17 +92,20 @@ public class NMContainerTokenSecretManager extends
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
}
for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
ContainerId containerId = entry.getKey();
Long expTime = entry.getValue();
List<ContainerId> containerList =
recentlyStartedContainerTracker.get(expTime);
if (containerList == null) {
containerList = new ArrayList<ContainerId>();
recentlyStartedContainerTracker.put(expTime, containerList);
}
if (!containerList.contains(containerId)) {
containerList.add(containerId);
try (RecoveryIterator<Entry<ContainerId, Long>> it = state.getIterator()) {
while (it.hasNext()) {
Entry<ContainerId, Long> entry = it.next();
ContainerId containerId = entry.getKey();
Long expTime = entry.getValue();
List<ContainerId> containerList =
recentlyStartedContainerTracker.get(expTime);
if (containerList == null) {
containerList = new ArrayList<ContainerId>();
recentlyStartedContainerTracker.put(expTime, containerList);
}
if (!containerList.contains(containerId)) {
containerList.add(containerId);
}
}
}
}

View File

@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,11 +89,14 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
}
for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
state.getApplicationMasterKeys().entrySet()) {
key = entry.getValue();
oldMasterKeys.put(entry.getKey(),
new MasterKeyData(key, createSecretKey(key.getBytes().array())));
try (RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it =
state.getIterator()) {
while (it.hasNext()) {
Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
key = entry.getValue();
oldMasterKeys.put(entry.getKey(),
new MasterKeyData(key, createSecretKey(key.getBytes().array())));
}
}
// reconstruct app to app attempts map

View File

@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -56,6 +57,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
private RecoveredNMTokensState nmTokenState;
private RecoveredContainerTokensState containerTokenState;
private Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
private Map<ContainerId, Long> activeTokens;
private Map<ApplicationId, LogDeleterProto> logDeleterState;
private RecoveredAMRMProxyState amrmProxyState;
@ -68,10 +71,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
containerStates = new HashMap<ContainerId, RecoveredContainerState>();
nmTokenState = new RecoveredNMTokensState();
nmTokenState.applicationMasterKeys =
new HashMap<ApplicationAttemptId, MasterKey>();
applicationMasterKeys = new HashMap<ApplicationAttemptId, MasterKey>();
containerTokenState = new RecoveredContainerTokensState();
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
activeTokens = new HashMap<ContainerId, Long>();
trackerStates = new HashMap<TrackerKey, TrackerState>();
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
@ -86,13 +88,39 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
protected void closeStorage() {
}
// Recovery Iterator Implementation.
private class NMMemoryRecoveryIterator<T> implements RecoveryIterator<T> {
private Iterator<T> it;
NMMemoryRecoveryIterator(Iterator<T> it){
this.it = it;
}
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public T next() throws IOException {
return it.next();
}
@Override
public void close() throws IOException {
}
}
@Override
public synchronized RecoveredApplicationsState loadApplicationsState()
throws IOException {
RecoveredApplicationsState state = new RecoveredApplicationsState();
state.applications = new ArrayList<ContainerManagerApplicationProto>(
apps.values());
List<ContainerManagerApplicationProto> containerList =
new ArrayList<ContainerManagerApplicationProto>(apps.values());
state.it = new NMMemoryRecoveryIterator<ContainerManagerApplicationProto>(
containerList.iterator());
return state;
}
@ -111,13 +139,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
public synchronized List<RecoveredContainerState> loadContainersState()
public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
throws IOException {
// return a copy so caller can't modify our state
List<RecoveredContainerState> result =
new ArrayList<RecoveredContainerState>(containerStates.size());
for (RecoveredContainerState rcs : containerStates.values()) {
RecoveredContainerState rcsCopy = new RecoveredContainerState();
RecoveredContainerState rcsCopy = new RecoveredContainerState(rcs.getContainerId());
rcsCopy.status = rcs.status;
rcsCopy.exitCode = rcs.exitCode;
rcsCopy.killed = rcs.killed;
@ -131,13 +159,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
rcsCopy.setResourceMappings(rcs.getResourceMappings());
result.add(rcsCopy);
}
return result;
return new NMMemoryRecoveryIterator<RecoveredContainerState>(
result.iterator());
}
@Override
public synchronized void storeContainer(ContainerId containerId,
int version, long startTime, StartContainerRequest startRequest) {
RecoveredContainerState rcs = new RecoveredContainerState();
RecoveredContainerState rcs = new RecoveredContainerState(containerId);
rcs.startRequest = startRequest;
rcs.version = version;
try {
@ -284,6 +313,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
public synchronized RecoveredLocalizationState loadLocalizationState() {
RecoveredLocalizationState result = new RecoveredLocalizationState();
Map<String, RecoveredUserResources> userResources =
new HashMap<String, RecoveredUserResources>();
for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) {
TrackerKey tk = e.getKey();
TrackerState ts = e.getValue();
@ -294,10 +325,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
if (tk.user == null) {
result.publicTrackerState = loadTrackerState(ts);
} else {
RecoveredUserResources rur = result.userResources.get(tk.user);
RecoveredUserResources rur = userResources.get(tk.user);
if (rur == null) {
rur = new RecoveredUserResources();
result.userResources.put(tk.user, rur);
userResources.put(tk.user, rur);
}
if (tk.appId == null) {
rur.privateTrackerState = loadTrackerState(ts);
@ -306,6 +337,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
}
}
result.it = new NMMemoryRecoveryIterator<Map.Entry<String, RecoveredUserResources>>(
userResources.entrySet().iterator());
return result;
}
@ -341,8 +374,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
throws IOException {
RecoveredDeletionServiceState result =
new RecoveredDeletionServiceState();
result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(
deleteTasks.values());
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
new ArrayList<DeletionServiceDeleteTaskProto>(deleteTasks.values());
result.it = new NMMemoryRecoveryIterator<DeletionServiceDeleteTaskProto>(
deleteTaskProtos.iterator());
return result;
}
@ -365,9 +400,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
RecoveredNMTokensState result = new RecoveredNMTokensState();
result.currentMasterKey = nmTokenState.currentMasterKey;
result.previousMasterKey = nmTokenState.previousMasterKey;
result.applicationMasterKeys =
new HashMap<ApplicationAttemptId, MasterKey>(
nmTokenState.applicationMasterKeys);
Map<ApplicationAttemptId, MasterKey> masterKeysMap =
new HashMap<ApplicationAttemptId, MasterKey>(applicationMasterKeys);
result.it = new NMMemoryRecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>>(
masterKeysMap.entrySet().iterator());
return result;
}
@ -389,14 +425,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
public synchronized void storeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt, MasterKey key) throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
nmTokenState.applicationMasterKeys.put(attempt,
applicationMasterKeys.put(attempt,
new MasterKeyPBImpl(keypb.getProto()));
}
@Override
public synchronized void removeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt) throws IOException {
nmTokenState.applicationMasterKeys.remove(attempt);
applicationMasterKeys.remove(attempt);
}
@ -408,8 +444,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
new RecoveredContainerTokensState();
result.currentMasterKey = containerTokenState.currentMasterKey;
result.previousMasterKey = containerTokenState.previousMasterKey;
result.activeTokens =
new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
Map<ContainerId, Long> containersTokenMap =
new HashMap<ContainerId, Long>(activeTokens);
result.it = new NMMemoryRecoveryIterator<Map.Entry<ContainerId, Long>>(
containersTokenMap.entrySet().iterator());
return result;
}
@ -432,13 +470,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
public synchronized void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException {
containerTokenState.activeTokens.put(containerId, expirationTime);
activeTokens.put(containerId, expirationTime);
}
@Override
public synchronized void removeContainerToken(ContainerId containerId)
throws IOException {
containerTokenState.activeTokens.remove(containerId);
activeTokens.remove(containerId);
}

View File

@ -125,6 +125,73 @@ public class TestNMLeveldbStateStoreService {
FileUtil.fullyDelete(TMP_DIR);
}
private List<RecoveredContainerState> loadContainersState(
RecoveryIterator<RecoveredContainerState> it) throws IOException {
List<RecoveredContainerState> containers =
new ArrayList<RecoveredContainerState>();
while (it.hasNext()) {
RecoveredContainerState rcs = it.next();
containers.add(rcs);
}
return containers;
}
private List<ContainerManagerApplicationProto> loadApplicationProtos(
RecoveryIterator<ContainerManagerApplicationProto> it)
throws IOException {
List<ContainerManagerApplicationProto> applicationProtos =
new ArrayList<ContainerManagerApplicationProto>();
while (it.hasNext()) {
applicationProtos.add(it.next());
}
return applicationProtos;
}
private List<DeletionServiceDeleteTaskProto> loadDeletionTaskProtos(
RecoveryIterator<DeletionServiceDeleteTaskProto> it) throws IOException {
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
new ArrayList<DeletionServiceDeleteTaskProto>();
while (it.hasNext()) {
deleteTaskProtos.add(it.next());
}
return deleteTaskProtos;
}
private Map<String, RecoveredUserResources> loadUserResources(
RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it)
throws IOException {
Map<String, RecoveredUserResources> userResources =
new HashMap<String, RecoveredUserResources>();
while (it.hasNext()) {
Map.Entry<String, RecoveredUserResources> entry = it.next();
userResources.put(entry.getKey(), entry.getValue());
}
return userResources;
}
private Map<ApplicationAttemptId, MasterKey> loadNMTokens(
RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it)
throws IOException {
Map<ApplicationAttemptId, MasterKey> nmTokens =
new HashMap<ApplicationAttemptId, MasterKey>();
while (it.hasNext()) {
Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
nmTokens.put(entry.getKey(), entry.getValue());
}
return nmTokens;
}
private Map<ContainerId, Long> loadContainerTokens(
RecoveryIterator<Map.Entry<ContainerId, Long>> it) throws IOException {
Map<ContainerId, Long> containerTokens =
new HashMap<ContainerId, Long>();
while (it.hasNext()) {
Map.Entry<ContainerId, Long> entry = it.next();
containerTokens.put(entry.getKey(), entry.getValue());
}
return containerTokens;
}
private void restartStateStore() throws IOException {
// need to close so leveldb releases database lock
if (stateStore != null) {
@ -142,7 +209,7 @@ public class TestNMLeveldbStateStoreService {
assertNotNull(pubts);
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
assertTrue(state.getUserResources().isEmpty());
assertTrue(loadUserResources(state.getIterator()).isEmpty());
}
@Test
@ -183,7 +250,7 @@ public class TestNMLeveldbStateStoreService {
restartStateStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for NM state:"));
}
}
@ -192,7 +259,9 @@ public class TestNMLeveldbStateStoreService {
public void testApplicationStorage() throws IOException {
// test empty when no state
RecoveredApplicationsState state = stateStore.loadApplicationsState();
assertTrue(state.getApplications().isEmpty());
List<ContainerManagerApplicationProto> apps =
loadApplicationProtos(state.getIterator());
assertTrue(apps.isEmpty());
// store an application and verify recovered
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
@ -204,8 +273,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeApplication(appId1, appProto1);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
apps = loadApplicationProtos(state.getIterator());
assertEquals(1, apps.size());
assertEquals(appProto1, apps.get(0));
// add a new app
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
@ -216,23 +286,25 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeApplication(appId2, appProto2);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(2, state.getApplications().size());
assertTrue(state.getApplications().contains(appProto1));
assertTrue(state.getApplications().contains(appProto2));
apps = loadApplicationProtos(state.getIterator());
assertEquals(2, apps.size());
assertTrue(apps.contains(appProto1));
assertTrue(apps.contains(appProto2));
// test removing an application
stateStore.removeApplication(appId2);
restartStateStore();
state = stateStore.loadApplicationsState();
assertEquals(1, state.getApplications().size());
assertEquals(appProto1, state.getApplications().get(0));
apps = loadApplicationProtos(state.getIterator());
assertEquals(1, apps.size());
assertEquals(appProto1, apps.get(0));
}
@Test
public void testContainerStorage() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers =
stateStore.loadContainersState();
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// create a container request
@ -254,7 +326,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.getContainerVersionKey(containerId.toString()))));
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
@ -269,14 +342,16 @@ public class TestNMLeveldbStateStoreService {
// store a new container record without StartContainerRequest
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
stateStore.storeContainerLaunched(containerId1);
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
// check whether the new container record is discarded
assertEquals(1, recoveredContainers.size());
// queue the container, and verify recovered
stateStore.storeContainerQueued(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
@ -292,7 +367,8 @@ public class TestNMLeveldbStateStoreService {
diags.append("some diags for container");
stateStore.storeContainerDiagnostics(containerId, diags);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
@ -305,7 +381,8 @@ public class TestNMLeveldbStateStoreService {
// pause the container, and verify recovered
stateStore.storeContainerPaused(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
@ -316,7 +393,8 @@ public class TestNMLeveldbStateStoreService {
// Resume the container
stateStore.removeContainerPaused(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
// increase the container size, and verify recovered
@ -328,7 +406,8 @@ public class TestNMLeveldbStateStoreService {
stateStore
.storeContainerUpdateToken(containerId, updateTokenIdentifier);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
@ -342,7 +421,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerKilled(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
@ -358,7 +438,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerCompleted(containerId, 21);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
@ -371,7 +452,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
@ -382,12 +464,13 @@ public class TestNMLeveldbStateStoreService {
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// recover again to check remove clears all containers
restartStateStore();
NMStateStoreService nmStoreSpy = spy(stateStore);
nmStoreSpy.loadContainersState();
loadContainersState(nmStoreSpy.getContainerStateIterator());
verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class));
}
@ -399,7 +482,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerRestartTimes(containerId,
finishTimeForRetryAttempts);
restartStateStore();
RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
RecoveredContainerState rcs =
loadContainersState(stateStore.getContainerStateIterator()).get(0);
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
@ -481,7 +565,7 @@ public class TestNMLeveldbStateStoreService {
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
@ -535,7 +619,7 @@ public class TestNMLeveldbStateStoreService {
pubts.getInProgressResources().get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
userResources = state.getUserResources();
userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
@ -584,7 +668,7 @@ public class TestNMLeveldbStateStoreService {
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
@ -654,7 +738,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(1, pubts.getInProgressResources().size());
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
userResources = state.getUserResources();
userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
@ -762,7 +846,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next());
Map<String, RecoveredUserResources> userResources =
state.getUserResources();
loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty());
}
@ -771,7 +855,9 @@ public class TestNMLeveldbStateStoreService {
// test empty when no state
RecoveredDeletionServiceState state =
stateStore.loadDeletionServiceState();
assertTrue(state.getTasks().isEmpty());
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
loadDeletionTaskProtos(state.getIterator());
assertTrue(deleteTaskProtos.isEmpty());
// store a deletion task and verify recovered
DeletionServiceDeleteTaskProto proto =
@ -788,8 +874,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeDeletionTask(proto.getId(), proto);
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertEquals(1, state.getTasks().size());
assertEquals(proto, state.getTasks().get(0));
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertEquals(1, deleteTaskProtos.size());
assertEquals(proto, deleteTaskProtos.get(0));
// store another deletion task
DeletionServiceDeleteTaskProto proto2 =
@ -802,31 +889,36 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeDeletionTask(proto2.getId(), proto2);
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertEquals(2, state.getTasks().size());
assertTrue(state.getTasks().contains(proto));
assertTrue(state.getTasks().contains(proto2));
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertEquals(2, deleteTaskProtos.size());
assertTrue(deleteTaskProtos.contains(proto));
assertTrue(deleteTaskProtos.contains(proto2));
// delete a task and verify gone after recovery
stateStore.removeDeletionTask(proto2.getId());
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertEquals(1, state.getTasks().size());
assertEquals(proto, state.getTasks().get(0));
state = stateStore.loadDeletionServiceState();
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertEquals(1, deleteTaskProtos.size());
assertEquals(proto, deleteTaskProtos.get(0));
// delete the last task and verify none left
stateStore.removeDeletionTask(proto.getId());
restartStateStore();
state = stateStore.loadDeletionServiceState();
assertTrue(state.getTasks().isEmpty());
}
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
assertTrue(deleteTaskProtos.isEmpty()); }
@Test
public void testNMTokenStorage() throws IOException {
// test empty when no state
RecoveredNMTokensState state = stateStore.loadNMTokensState();
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
loadNMTokens(state.getIterator());
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
assertTrue(loadedAppKeys.isEmpty());
// store a master key and verify recovered
NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
@ -834,18 +926,20 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
assertTrue(loadedAppKeys.isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = secretMgr.generateKey();
stateStore.storeNMTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(state.getApplicationMasterKeys().isEmpty());
assertTrue(loadedAppKeys.isEmpty());
// store a few application keys and verify recovered
ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
@ -858,10 +952,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
state.getApplicationMasterKeys();
assertEquals(2, loadedAppKeys.size());
assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
@ -880,9 +973,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
loadedAppKeys = state.getApplicationMasterKeys();
assertEquals(2, loadedAppKeys.size());
assertNull(loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
@ -894,9 +987,10 @@ public class TestNMLeveldbStateStoreService {
// test empty when no state
RecoveredContainerTokensState state =
stateStore.loadContainerTokensState();
Map<ContainerId, Long> loadedActiveTokens = loadContainerTokens(state.it);
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
assertTrue(loadedActiveTokens.isEmpty());
// store a master key and verify recovered
ContainerTokenKeyGeneratorForTest keygen =
@ -905,18 +999,20 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
assertTrue(loadedActiveTokens.isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = keygen.generateKey();
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
assertTrue(state.getActiveTokens().isEmpty());
assertTrue(loadedActiveTokens.isEmpty());
// store a few container tokens and verify recovered
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
@ -927,10 +1023,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerToken(cid2, expTime2);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
Map<ContainerId, Long> loadedActiveTokens =
state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertEquals(expTime1, loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
@ -948,9 +1043,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
loadedActiveTokens = state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertNull(loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
@ -1029,8 +1124,8 @@ public class TestNMLeveldbStateStoreService {
@Test
public void testUnexpectedKeyDoesntThrowException() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers = stateStore
.loadContainersState();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
@ -1045,7 +1140,8 @@ public class TestNMLeveldbStateStoreService {
+ containerId.toString() + "/invalidKey1234").getBytes();
stateStore.getDB().put(invalidKey, new byte[1]);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
@ -1162,8 +1258,8 @@ public class TestNMLeveldbStateStoreService {
@Test
public void testStateStoreForResourceMapping() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers = stateStore
.loadContainersState();
List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
@ -1190,7 +1286,8 @@ public class TestNMLeveldbStateStoreService {
// add a invalid key
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
List<Serializable> res = rcs.getResourceMappings()
@ -1253,7 +1350,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerRestartTimes(containerId,
restartTimes);
restartStateStore();
RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
RecoveredContainerState rcs =
loadContainersState(stateStore.getContainerStateIterator()).get(0);
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
assertTrue(recoveredRestartTimes.isEmpty());
}