YARN-8242. YARN NM: OOM error while reading back the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet Sachdev
This commit is contained in:
parent
01ff817814
commit
65e7469712
|
@ -19,13 +19,14 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
@ -96,16 +97,20 @@ public class DeletionService extends AbstractService {
|
|||
|
||||
private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
|
||||
throws IOException {
|
||||
List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
|
||||
Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
|
||||
new HashMap<>(taskProtos.size());
|
||||
Set<Integer> successorTasks = new HashSet<>();
|
||||
for (DeletionServiceDeleteTaskProto proto : taskProtos) {
|
||||
DeletionTaskRecoveryInfo info =
|
||||
NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
|
||||
idToInfoMap.put(info.getTask().getTaskId(), info);
|
||||
nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
|
||||
successorTasks.addAll(info.getSuccessorTaskIds());
|
||||
new HashMap<Integer, DeletionTaskRecoveryInfo>();
|
||||
Set<Integer> successorTasks = new HashSet<Integer>();
|
||||
|
||||
try (RecoveryIterator<DeletionServiceDeleteTaskProto> it =
|
||||
state.getIterator()) {
|
||||
while (it.hasNext()) {
|
||||
DeletionServiceDeleteTaskProto proto = it.next();
|
||||
DeletionTaskRecoveryInfo info =
|
||||
NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
|
||||
idToInfoMap.put(info.getTask().getTaskId(), info);
|
||||
nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
|
||||
successorTasks.addAll(info.getSuccessorTaskIds());
|
||||
}
|
||||
}
|
||||
|
||||
// restore the task dependencies and schedule the deletion tasks that
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.protobuf.ByteString;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -356,19 +357,26 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
stateStore.loadLocalizationState());
|
||||
|
||||
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
|
||||
for (ContainerManagerApplicationProto proto :
|
||||
appsState.getApplications()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recovering application with state: " + proto.toString());
|
||||
try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
|
||||
appsState.getIterator()) {
|
||||
while (rasIterator.hasNext()) {
|
||||
ContainerManagerApplicationProto proto = rasIterator.next();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recovering application with state: " + proto.toString());
|
||||
}
|
||||
recoverApplication(proto);
|
||||
}
|
||||
recoverApplication(proto);
|
||||
}
|
||||
|
||||
for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recovering container with state: " + rcs);
|
||||
try (RecoveryIterator<RecoveredContainerState> rcsIterator =
|
||||
stateStore.getContainerStateIterator()) {
|
||||
while (rcsIterator.hasNext()) {
|
||||
RecoveredContainerState rcs = rcsIterator.next();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Recovering container with state: " + rcs);
|
||||
}
|
||||
recoverContainer(rcs);
|
||||
}
|
||||
recoverContainer(rcs);
|
||||
}
|
||||
|
||||
// Recovery AMRMProxy state after apps and containers are recovered
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
|||
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -295,42 +297,46 @@ public class ResourceLocalizationService extends CompositeService
|
|||
|
||||
//Recover localized resources after an NM restart
|
||||
public void recoverLocalizedResources(RecoveredLocalizationState state)
|
||||
throws URISyntaxException {
|
||||
throws URISyntaxException, IOException {
|
||||
LocalResourceTrackerState trackerState = state.getPublicTrackerState();
|
||||
recoverTrackerResources(publicRsrc, trackerState);
|
||||
|
||||
for (Map.Entry<String, RecoveredUserResources> userEntry :
|
||||
state.getUserResources().entrySet()) {
|
||||
String user = userEntry.getKey();
|
||||
RecoveredUserResources userResources = userEntry.getValue();
|
||||
trackerState = userResources.getPrivateTrackerState();
|
||||
if (!trackerState.isEmpty()) {
|
||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||
null, dispatcher, true, super.getConfig(), stateStore, dirsHandler);
|
||||
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
|
||||
tracker);
|
||||
if (oldTracker != null) {
|
||||
tracker = oldTracker;
|
||||
}
|
||||
recoverTrackerResources(tracker, trackerState);
|
||||
}
|
||||
|
||||
for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
|
||||
userResources.getAppTrackerStates().entrySet()) {
|
||||
trackerState = appEntry.getValue();
|
||||
try (RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it
|
||||
= state.getIterator()) {
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String, RecoveredUserResources> userEntry = it.next();
|
||||
String user = userEntry.getKey();
|
||||
RecoveredUserResources userResources = userEntry.getValue();
|
||||
trackerState = userResources.getPrivateTrackerState();
|
||||
if (!trackerState.isEmpty()) {
|
||||
ApplicationId appId = appEntry.getKey();
|
||||
String appIdStr = appId.toString();
|
||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||
appId, dispatcher, false, super.getConfig(), stateStore,
|
||||
null, dispatcher, true, super.getConfig(), stateStore,
|
||||
dirsHandler);
|
||||
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
|
||||
LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
|
||||
tracker);
|
||||
if (oldTracker != null) {
|
||||
tracker = oldTracker;
|
||||
}
|
||||
recoverTrackerResources(tracker, trackerState);
|
||||
}
|
||||
|
||||
for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
|
||||
userResources.getAppTrackerStates().entrySet()) {
|
||||
trackerState = appEntry.getValue();
|
||||
if (!trackerState.isEmpty()) {
|
||||
ApplicationId appId = appEntry.getKey();
|
||||
String appIdStr = appId.toString();
|
||||
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
||||
appId, dispatcher, false, super.getConfig(), stateStore,
|
||||
dirsHandler);
|
||||
LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
|
||||
tracker);
|
||||
if (oldTracker != null) {
|
||||
tracker = oldTracker;
|
||||
}
|
||||
recoverTrackerResources(tracker, trackerState);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -556,7 +562,7 @@ public class ResourceLocalizationService extends CompositeService
|
|||
rsrcCleanup.getResources();
|
||||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainerId().getApplicationAttemptId()
|
||||
.getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
|
@ -73,6 +74,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
@ -225,68 +227,119 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
return isHealthy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RecoveredContainerState> loadContainersState()
|
||||
// LeveldbIterator starting at startkey
|
||||
private LeveldbIterator getLevelDBIterator(String startKey)
|
||||
throws IOException {
|
||||
ArrayList<RecoveredContainerState> containers =
|
||||
new ArrayList<RecoveredContainerState>();
|
||||
ArrayList<ContainerId> containersToRemove =
|
||||
new ArrayList<ContainerId>();
|
||||
LeveldbIterator iter = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
iter.seek(bytes(CONTAINERS_KEY_PREFIX));
|
||||
LeveldbIterator it = new LeveldbIterator(db);
|
||||
it.seek(bytes(startKey));
|
||||
return it;
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = iter.peekNext();
|
||||
// Base Recovery Iterator
|
||||
private abstract class BaseRecoveryIterator<T> implements
|
||||
RecoveryIterator<T> {
|
||||
LeveldbIterator it;
|
||||
T nextItem;
|
||||
|
||||
BaseRecoveryIterator(String dbKey) throws IOException {
|
||||
this.it = getLevelDBIterator(dbKey);
|
||||
this.nextItem = null;
|
||||
}
|
||||
|
||||
protected abstract T getNextItem(LeveldbIterator it) throws IOException;
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
if (nextItem == null) {
|
||||
nextItem = getNextItem(it);
|
||||
}
|
||||
return (nextItem != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T next() throws IOException, NoSuchElementException {
|
||||
T tmp = nextItem;
|
||||
if (tmp != null) {
|
||||
nextItem = null;
|
||||
return tmp;
|
||||
} else {
|
||||
tmp = getNextItem(it);
|
||||
if (tmp == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
return tmp;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (it != null) {
|
||||
it.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Container Recovery Iterator
|
||||
private class ContainerStateIterator extends
|
||||
BaseRecoveryIterator<RecoveredContainerState> {
|
||||
ContainerStateIterator() throws IOException {
|
||||
super(CONTAINERS_KEY_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecoveredContainerState getNextItem(LeveldbIterator it)
|
||||
throws IOException {
|
||||
return getNextRecoveredContainer(it);
|
||||
}
|
||||
}
|
||||
|
||||
private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it)
|
||||
throws IOException {
|
||||
RecoveredContainerState rcs = null;
|
||||
try {
|
||||
while (it.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = it.peekNext();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
|
||||
break;
|
||||
return null;
|
||||
}
|
||||
|
||||
int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
|
||||
if (idEndPos < 0) {
|
||||
throw new IOException("Unable to determine container in key: " + key);
|
||||
}
|
||||
ContainerId containerId = ContainerId.fromString(
|
||||
key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
|
||||
String keyPrefix = key.substring(0, idEndPos+1);
|
||||
RecoveredContainerState rcs = loadContainerState(containerId,
|
||||
iter, keyPrefix);
|
||||
// Don't load container without StartContainerRequest
|
||||
String keyPrefix = key.substring(0, idEndPos + 1);
|
||||
rcs = loadContainerState(it, keyPrefix);
|
||||
if (rcs.startRequest != null) {
|
||||
containers.add(rcs);
|
||||
break;
|
||||
} else {
|
||||
containersToRemove.add(containerId);
|
||||
removeContainer(rcs.getContainerId());
|
||||
rcs = null;
|
||||
}
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
// remove container without StartContainerRequest
|
||||
for (ContainerId containerId : containersToRemove) {
|
||||
LOG.warn("Remove container " + containerId +
|
||||
" with incomplete records");
|
||||
try {
|
||||
removeContainer(containerId);
|
||||
// TODO: kill and cleanup the leaked container
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to remove container " + containerId +
|
||||
" in store", e);
|
||||
}
|
||||
}
|
||||
|
||||
return containers;
|
||||
return rcs;
|
||||
}
|
||||
|
||||
private RecoveredContainerState loadContainerState(ContainerId containerId,
|
||||
LeveldbIterator iter, String keyPrefix) throws IOException {
|
||||
RecoveredContainerState rcs = new RecoveredContainerState();
|
||||
|
||||
@Override
|
||||
public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
|
||||
throws IOException {
|
||||
return new ContainerStateIterator();
|
||||
}
|
||||
|
||||
private RecoveredContainerState loadContainerState(LeveldbIterator iter,
|
||||
String keyPrefix) throws IOException {
|
||||
ContainerId containerId = ContainerId.fromString(
|
||||
keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(),
|
||||
keyPrefix.length()-1));
|
||||
RecoveredContainerState rcs = new RecoveredContainerState(containerId);
|
||||
rcs.status = RecoveredContainerStatus.REQUESTED;
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[],byte[]> entry = iter.peekNext();
|
||||
|
@ -680,35 +733,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
|
||||
|
||||
// Application Recovery Iterator
|
||||
private class ApplicationStateIterator extends
|
||||
BaseRecoveryIterator<ContainerManagerApplicationProto> {
|
||||
ApplicationStateIterator() throws IOException {
|
||||
super(APPLICATIONS_KEY_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it)
|
||||
throws IOException {
|
||||
return getNextRecoveredApplication(it);
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerManagerApplicationProto getNextRecoveredApplication(
|
||||
LeveldbIterator it) throws IOException {
|
||||
ContainerManagerApplicationProto applicationProto = null;
|
||||
try {
|
||||
if (it.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = it.next();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) {
|
||||
return null;
|
||||
}
|
||||
applicationProto = ContainerManagerApplicationProto.parseFrom(
|
||||
entry.getValue());
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return applicationProto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredApplicationsState loadApplicationsState()
|
||||
throws IOException {
|
||||
RecoveredApplicationsState state = new RecoveredApplicationsState();
|
||||
state.applications = new ArrayList<ContainerManagerApplicationProto>();
|
||||
String keyPrefix = APPLICATIONS_KEY_PREFIX;
|
||||
LeveldbIterator iter = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
iter.seek(bytes(keyPrefix));
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = iter.next();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(keyPrefix)) {
|
||||
break;
|
||||
}
|
||||
state.applications.add(
|
||||
ContainerManagerApplicationProto.parseFrom(entry.getValue()));
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
|
||||
state.it = new ApplicationStateIterator();
|
||||
cleanupDeprecatedFinishedApps();
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
|
@ -752,24 +815,29 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RecoveredLocalizationState loadLocalizationState()
|
||||
throws IOException {
|
||||
RecoveredLocalizationState state = new RecoveredLocalizationState();
|
||||
// User Resource Recovery Iterator.
|
||||
private class UserResourcesIterator extends
|
||||
BaseRecoveryIterator<Entry<String, RecoveredUserResources>> {
|
||||
UserResourcesIterator() throws IOException {
|
||||
super(LOCALIZATION_PRIVATE_KEY_PREFIX);
|
||||
}
|
||||
|
||||
LeveldbIterator iter = null;
|
||||
@Override
|
||||
protected Entry<String, RecoveredUserResources> getNextItem(
|
||||
LeveldbIterator it) throws IOException {
|
||||
return getNextRecoveredPrivateLocalizationEntry(it);
|
||||
}
|
||||
}
|
||||
|
||||
private Entry<String, RecoveredUserResources> getNextRecoveredPrivateLocalizationEntry(
|
||||
LeveldbIterator it) throws IOException {
|
||||
Entry<String, RecoveredUserResources> localEntry = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
|
||||
state.publicTrackerState = loadResourceTrackerState(iter,
|
||||
LOCALIZATION_PUBLIC_KEY_PREFIX);
|
||||
|
||||
iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX));
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[],byte[]> entry = iter.peekNext();
|
||||
if (it.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = it.peekNext();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) {
|
||||
break;
|
||||
return null;
|
||||
}
|
||||
|
||||
int userEndPos = key.indexOf('/',
|
||||
|
@ -780,17 +848,24 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
String user = key.substring(
|
||||
LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos);
|
||||
state.userResources.put(user, loadUserLocalizedResources(iter,
|
||||
key.substring(0, userEndPos+1)));
|
||||
RecoveredUserResources val = loadUserLocalizedResources(it,
|
||||
key.substring(0, userEndPos+1));
|
||||
localEntry = new AbstractMap.SimpleEntry<>(user, val);
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
return localEntry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredLocalizationState loadLocalizationState()
|
||||
throws IOException {
|
||||
RecoveredLocalizationState state = new RecoveredLocalizationState();
|
||||
LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX);
|
||||
state.publicTrackerState = loadResourceTrackerState(it,
|
||||
LOCALIZATION_PUBLIC_KEY_PREFIX);
|
||||
state.it = new UserResourcesIterator();
|
||||
return state;
|
||||
}
|
||||
|
||||
|
@ -800,7 +875,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
|
||||
LocalResourceTrackerState state = new LocalResourceTrackerState();
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[],byte[]> entry = iter.peekNext();
|
||||
Entry<byte[], byte[]> entry = iter.peekNext();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(keyPrefix)) {
|
||||
break;
|
||||
|
@ -981,32 +1056,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
+ LOCALIZATION_APPCACHE_SUFFIX + appId + "/";
|
||||
}
|
||||
|
||||
// Deletion State Recovery Iterator.
|
||||
private class DeletionStateIterator extends
|
||||
BaseRecoveryIterator<DeletionServiceDeleteTaskProto> {
|
||||
DeletionStateIterator() throws IOException {
|
||||
super(DELETION_TASK_KEY_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it)
|
||||
throws IOException {
|
||||
return getNextRecoveredDeletionService(it);
|
||||
}
|
||||
}
|
||||
|
||||
private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService(
|
||||
LeveldbIterator it) throws IOException {
|
||||
DeletionServiceDeleteTaskProto deleteProto = null;
|
||||
try {
|
||||
if (it.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = it.next();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
|
||||
return null;
|
||||
}
|
||||
deleteProto = DeletionServiceDeleteTaskProto.parseFrom(
|
||||
entry.getValue());
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
return deleteProto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredDeletionServiceState loadDeletionServiceState()
|
||||
throws IOException {
|
||||
RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
|
||||
state.tasks = new ArrayList<DeletionServiceDeleteTaskProto>();
|
||||
LeveldbIterator iter = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
iter.seek(bytes(DELETION_TASK_KEY_PREFIX));
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = iter.next();
|
||||
String key = asString(entry.getKey());
|
||||
if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
|
||||
break;
|
||||
}
|
||||
state.tasks.add(
|
||||
DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
state.it = new DeletionStateIterator();
|
||||
return state;
|
||||
}
|
||||
|
||||
|
@ -1033,29 +1120,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
}
|
||||
|
||||
private MasterKey getMasterKey(String dbKey) throws IOException {
|
||||
try{
|
||||
byte[] data = db.get(bytes(dbKey));
|
||||
if (data == null || data.length == 0) {
|
||||
return null;
|
||||
}
|
||||
return parseMasterKey(data);
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||
RecoveredNMTokensState state = new RecoveredNMTokensState();
|
||||
state.applicationMasterKeys =
|
||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||
LeveldbIterator iter = null;
|
||||
// Recover NMTokens Iterator
|
||||
private class NMTokensStateIterator extends
|
||||
BaseRecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> {
|
||||
NMTokensStateIterator() throws IOException {
|
||||
super(NM_TOKENS_KEY_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Entry<ApplicationAttemptId, MasterKey> getNextItem(
|
||||
LeveldbIterator it) throws IOException {
|
||||
return getNextMasterKeyEntry(it);
|
||||
}
|
||||
}
|
||||
|
||||
private Entry<ApplicationAttemptId, MasterKey> getNextMasterKeyEntry(
|
||||
LeveldbIterator it) throws IOException {
|
||||
Entry<ApplicationAttemptId, MasterKey> masterKeyentry = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
iter.seek(bytes(NM_TOKENS_KEY_PREFIX));
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = iter.next();
|
||||
while (it.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = it.next();
|
||||
String fullKey = asString(entry.getKey());
|
||||
if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) {
|
||||
break;
|
||||
}
|
||||
String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length());
|
||||
if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
|
||||
state.currentMasterKey = parseMasterKey(entry.getValue());
|
||||
} else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
|
||||
state.previousMasterKey = parseMasterKey(entry.getValue());
|
||||
} else if (key.startsWith(
|
||||
ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
||||
if (key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
||||
ApplicationAttemptId attempt;
|
||||
try {
|
||||
attempt = ApplicationAttemptId.fromString(key);
|
||||
|
@ -1063,17 +1165,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
throw new IOException("Bad application master key state for "
|
||||
+ fullKey, e);
|
||||
}
|
||||
state.applicationMasterKeys.put(attempt,
|
||||
masterKeyentry = new AbstractMap.SimpleEntry<>(attempt,
|
||||
parseMasterKey(entry.getValue()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
return masterKeyentry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredNMTokensState loadNMTokensState() throws IOException {
|
||||
RecoveredNMTokensState state = new RecoveredNMTokensState();
|
||||
state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
|
||||
+ CURRENT_MASTER_KEY_SUFFIX);
|
||||
state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
|
||||
+ PREV_MASTER_KEY_SUFFIX);
|
||||
state.it = new NMTokensStateIterator();
|
||||
return state;
|
||||
}
|
||||
|
||||
|
@ -1122,45 +1232,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
}
|
||||
|
||||
// Recover ContainersToken Iterator.
|
||||
private class ContainerTokensStateIterator extends
|
||||
BaseRecoveryIterator<Entry<ContainerId, Long>> {
|
||||
ContainerTokensStateIterator() throws IOException {
|
||||
super(CONTAINER_TOKENS_KEY_PREFIX);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredContainerTokensState loadContainerTokensState()
|
||||
@Override
|
||||
protected Entry<ContainerId, Long> getNextItem(LeveldbIterator it)
|
||||
throws IOException {
|
||||
return getNextContainerToken(it);
|
||||
}
|
||||
}
|
||||
|
||||
private Entry<ContainerId, Long> getNextContainerToken(LeveldbIterator it)
|
||||
throws IOException {
|
||||
RecoveredContainerTokensState state = new RecoveredContainerTokensState();
|
||||
state.activeTokens = new HashMap<ContainerId, Long>();
|
||||
LeveldbIterator iter = null;
|
||||
Entry<ContainerId, Long> containerTokenEntry = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
|
||||
final int containerTokensKeyPrefixLength =
|
||||
CONTAINER_TOKENS_KEY_PREFIX.length();
|
||||
while (iter.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = iter.next();
|
||||
while (it.hasNext()) {
|
||||
Entry<byte[], byte[]> entry = it.next();
|
||||
String fullKey = asString(entry.getKey());
|
||||
if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
|
||||
break;
|
||||
}
|
||||
String key = fullKey.substring(containerTokensKeyPrefixLength);
|
||||
if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
|
||||
state.currentMasterKey = parseMasterKey(entry.getValue());
|
||||
} else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
|
||||
state.previousMasterKey = parseMasterKey(entry.getValue());
|
||||
} else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
|
||||
loadContainerToken(state, fullKey, key, entry.getValue());
|
||||
String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length());
|
||||
if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
|
||||
containerTokenEntry = loadContainerToken(fullKey, key,
|
||||
entry.getValue());
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
if (iter != null) {
|
||||
iter.close();
|
||||
}
|
||||
}
|
||||
return state;
|
||||
return containerTokenEntry;
|
||||
}
|
||||
|
||||
private static void loadContainerToken(RecoveredContainerTokensState state,
|
||||
String key, String containerIdStr, byte[] value) throws IOException {
|
||||
private static Entry<ContainerId, Long> loadContainerToken(String key,
|
||||
String containerIdStr, byte[] value) throws IOException {
|
||||
ContainerId containerId;
|
||||
Long expTime;
|
||||
try {
|
||||
|
@ -1169,7 +1279,19 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
} catch (IllegalArgumentException e) {
|
||||
throw new IOException("Bad container token state for " + key, e);
|
||||
}
|
||||
state.activeTokens.put(containerId, expTime);
|
||||
return new AbstractMap.SimpleEntry<>(containerId, expTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecoveredContainerTokensState loadContainerTokensState()
|
||||
throws IOException {
|
||||
RecoveredContainerTokensState state = new RecoveredContainerTokensState();
|
||||
state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
|
||||
+ CURRENT_MASTER_KEY_SUFFIX);
|
||||
state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
|
||||
+ PREV_MASTER_KEY_SUFFIX);
|
||||
state.it = new ContainerTokensStateIterator();
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -65,7 +65,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<RecoveredContainerState> loadContainersState()
|
||||
public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException(
|
||||
"Recovery not supported by this state store");
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
|
@ -67,12 +68,11 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
}
|
||||
|
||||
public static class RecoveredApplicationsState {
|
||||
List<ContainerManagerApplicationProto> applications;
|
||||
RecoveryIterator<ContainerManagerApplicationProto> it = null;
|
||||
|
||||
public List<ContainerManagerApplicationProto> getApplications() {
|
||||
return applications;
|
||||
public RecoveryIterator<ContainerManagerApplicationProto> getIterator() {
|
||||
return it;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -106,6 +106,15 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
RecoveredContainerType.RECOVER;
|
||||
private long startTime;
|
||||
private ResourceMappings resMappings = new ResourceMappings();
|
||||
private final ContainerId containerId;
|
||||
|
||||
RecoveredContainerState(ContainerId containerId){
|
||||
this.containerId = containerId;
|
||||
}
|
||||
|
||||
public ContainerId getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
|
||||
public RecoveredContainerStatus getStatus() {
|
||||
return status;
|
||||
|
@ -248,30 +257,33 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
public static class RecoveredLocalizationState {
|
||||
LocalResourceTrackerState publicTrackerState =
|
||||
new LocalResourceTrackerState();
|
||||
Map<String, RecoveredUserResources> userResources =
|
||||
new HashMap<String, RecoveredUserResources>();
|
||||
RecoveryIterator<Entry<String, RecoveredUserResources>> it = null;
|
||||
|
||||
public LocalResourceTrackerState getPublicTrackerState() {
|
||||
return publicTrackerState;
|
||||
}
|
||||
|
||||
public Map<String, RecoveredUserResources> getUserResources() {
|
||||
return userResources;
|
||||
public RecoveryIterator<Entry<String, RecoveredUserResources>> getIterator() {
|
||||
return it;
|
||||
}
|
||||
}
|
||||
|
||||
public static class RecoveredDeletionServiceState {
|
||||
List<DeletionServiceDeleteTaskProto> tasks;
|
||||
RecoveryIterator<DeletionServiceDeleteTaskProto> it = null;
|
||||
|
||||
public List<DeletionServiceDeleteTaskProto> getTasks() {
|
||||
return tasks;
|
||||
public RecoveryIterator<DeletionServiceDeleteTaskProto> getIterator(){
|
||||
return it;
|
||||
}
|
||||
}
|
||||
|
||||
public static class RecoveredNMTokensState {
|
||||
MasterKey currentMasterKey;
|
||||
MasterKey previousMasterKey;
|
||||
Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
|
||||
RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> it = null;
|
||||
|
||||
public RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> getIterator() {
|
||||
return it;
|
||||
}
|
||||
|
||||
public MasterKey getCurrentMasterKey() {
|
||||
return currentMasterKey;
|
||||
|
@ -281,15 +293,16 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
return previousMasterKey;
|
||||
}
|
||||
|
||||
public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
|
||||
return applicationMasterKeys;
|
||||
}
|
||||
}
|
||||
|
||||
public static class RecoveredContainerTokensState {
|
||||
MasterKey currentMasterKey;
|
||||
MasterKey previousMasterKey;
|
||||
Map<ContainerId, Long> activeTokens;
|
||||
RecoveryIterator<Entry<ContainerId, Long>> it = null;
|
||||
|
||||
public RecoveryIterator<Entry<ContainerId, Long>> getIterator() {
|
||||
return it;
|
||||
}
|
||||
|
||||
public MasterKey getCurrentMasterKey() {
|
||||
return currentMasterKey;
|
||||
|
@ -299,9 +312,6 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
return previousMasterKey;
|
||||
}
|
||||
|
||||
public Map<ContainerId, Long> getActiveTokens() {
|
||||
return activeTokens;
|
||||
}
|
||||
}
|
||||
|
||||
public static class RecoveredLogDeleterState {
|
||||
|
@ -400,11 +410,10 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
|
||||
|
||||
/**
|
||||
* Load the state of containers
|
||||
* @return recovered state for containers
|
||||
* @throws IOException
|
||||
* get the Recovered Container State Iterator
|
||||
* @return recovery iterator
|
||||
*/
|
||||
public abstract List<RecoveredContainerState> loadContainersState()
|
||||
public abstract RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
/**
|
||||
* A wrapper for a Iterator to translate the raw RuntimeExceptions that
|
||||
* can be thrown into IOException.
|
||||
*/
|
||||
public interface RecoveryIterator<T> extends Closeable {
|
||||
|
||||
/**
|
||||
* Returns true if the iteration has more elements.
|
||||
*/
|
||||
boolean hasNext() throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the next element in the iteration.
|
||||
*/
|
||||
T next() throws IOException, NoSuchElementException;
|
||||
|
||||
}
|
|
@ -24,6 +24,8 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -90,17 +92,20 @@ public class NMContainerTokenSecretManager extends
|
|||
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
|
||||
}
|
||||
|
||||
for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
|
||||
ContainerId containerId = entry.getKey();
|
||||
Long expTime = entry.getValue();
|
||||
List<ContainerId> containerList =
|
||||
recentlyStartedContainerTracker.get(expTime);
|
||||
if (containerList == null) {
|
||||
containerList = new ArrayList<ContainerId>();
|
||||
recentlyStartedContainerTracker.put(expTime, containerList);
|
||||
}
|
||||
if (!containerList.contains(containerId)) {
|
||||
containerList.add(containerId);
|
||||
try (RecoveryIterator<Entry<ContainerId, Long>> it = state.getIterator()) {
|
||||
while (it.hasNext()) {
|
||||
Entry<ContainerId, Long> entry = it.next();
|
||||
ContainerId containerId = entry.getKey();
|
||||
Long expTime = entry.getValue();
|
||||
List<ContainerId> containerList =
|
||||
recentlyStartedContainerTracker.get(expTime);
|
||||
if (containerList == null) {
|
||||
containerList = new ArrayList<ContainerId>();
|
||||
recentlyStartedContainerTracker.put(expTime, containerList);
|
||||
}
|
||||
if (!containerList.contains(containerId)) {
|
||||
containerList.add(containerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -87,11 +89,14 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
|
|||
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
|
||||
}
|
||||
|
||||
for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
|
||||
state.getApplicationMasterKeys().entrySet()) {
|
||||
key = entry.getValue();
|
||||
oldMasterKeys.put(entry.getKey(),
|
||||
new MasterKeyData(key, createSecretKey(key.getBytes().array())));
|
||||
try (RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it =
|
||||
state.getIterator()) {
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
|
||||
key = entry.getValue();
|
||||
oldMasterKeys.put(entry.getKey(),
|
||||
new MasterKeyData(key, createSecretKey(key.getBytes().array())));
|
||||
}
|
||||
}
|
||||
|
||||
// reconstruct app to app attempts map
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.Serializable;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -56,6 +57,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
|
||||
private RecoveredNMTokensState nmTokenState;
|
||||
private RecoveredContainerTokensState containerTokenState;
|
||||
private Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
|
||||
private Map<ContainerId, Long> activeTokens;
|
||||
private Map<ApplicationId, LogDeleterProto> logDeleterState;
|
||||
private RecoveredAMRMProxyState amrmProxyState;
|
||||
|
||||
|
@ -68,10 +71,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
|
||||
containerStates = new HashMap<ContainerId, RecoveredContainerState>();
|
||||
nmTokenState = new RecoveredNMTokensState();
|
||||
nmTokenState.applicationMasterKeys =
|
||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||
applicationMasterKeys = new HashMap<ApplicationAttemptId, MasterKey>();
|
||||
containerTokenState = new RecoveredContainerTokensState();
|
||||
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
|
||||
activeTokens = new HashMap<ContainerId, Long>();
|
||||
trackerStates = new HashMap<TrackerKey, TrackerState>();
|
||||
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
|
||||
logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
|
||||
|
@ -86,13 +88,39 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
protected void closeStorage() {
|
||||
}
|
||||
|
||||
// Recovery Iterator Implementation.
|
||||
private class NMMemoryRecoveryIterator<T> implements RecoveryIterator<T> {
|
||||
|
||||
private Iterator<T> it;
|
||||
|
||||
NMMemoryRecoveryIterator(Iterator<T> it){
|
||||
this.it = it;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return it.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T next() throws IOException {
|
||||
return it.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized RecoveredApplicationsState loadApplicationsState()
|
||||
throws IOException {
|
||||
RecoveredApplicationsState state = new RecoveredApplicationsState();
|
||||
state.applications = new ArrayList<ContainerManagerApplicationProto>(
|
||||
apps.values());
|
||||
List<ContainerManagerApplicationProto> containerList =
|
||||
new ArrayList<ContainerManagerApplicationProto>(apps.values());
|
||||
state.it = new NMMemoryRecoveryIterator<ContainerManagerApplicationProto>(
|
||||
containerList.iterator());
|
||||
return state;
|
||||
}
|
||||
|
||||
|
@ -111,13 +139,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<RecoveredContainerState> loadContainersState()
|
||||
public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
|
||||
throws IOException {
|
||||
// return a copy so caller can't modify our state
|
||||
List<RecoveredContainerState> result =
|
||||
new ArrayList<RecoveredContainerState>(containerStates.size());
|
||||
for (RecoveredContainerState rcs : containerStates.values()) {
|
||||
RecoveredContainerState rcsCopy = new RecoveredContainerState();
|
||||
RecoveredContainerState rcsCopy = new RecoveredContainerState(rcs.getContainerId());
|
||||
rcsCopy.status = rcs.status;
|
||||
rcsCopy.exitCode = rcs.exitCode;
|
||||
rcsCopy.killed = rcs.killed;
|
||||
|
@ -131,13 +159,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
rcsCopy.setResourceMappings(rcs.getResourceMappings());
|
||||
result.add(rcsCopy);
|
||||
}
|
||||
return result;
|
||||
return new NMMemoryRecoveryIterator<RecoveredContainerState>(
|
||||
result.iterator());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeContainer(ContainerId containerId,
|
||||
int version, long startTime, StartContainerRequest startRequest) {
|
||||
RecoveredContainerState rcs = new RecoveredContainerState();
|
||||
RecoveredContainerState rcs = new RecoveredContainerState(containerId);
|
||||
rcs.startRequest = startRequest;
|
||||
rcs.version = version;
|
||||
try {
|
||||
|
@ -284,6 +313,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
@Override
|
||||
public synchronized RecoveredLocalizationState loadLocalizationState() {
|
||||
RecoveredLocalizationState result = new RecoveredLocalizationState();
|
||||
Map<String, RecoveredUserResources> userResources =
|
||||
new HashMap<String, RecoveredUserResources>();
|
||||
for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) {
|
||||
TrackerKey tk = e.getKey();
|
||||
TrackerState ts = e.getValue();
|
||||
|
@ -294,10 +325,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
if (tk.user == null) {
|
||||
result.publicTrackerState = loadTrackerState(ts);
|
||||
} else {
|
||||
RecoveredUserResources rur = result.userResources.get(tk.user);
|
||||
RecoveredUserResources rur = userResources.get(tk.user);
|
||||
if (rur == null) {
|
||||
rur = new RecoveredUserResources();
|
||||
result.userResources.put(tk.user, rur);
|
||||
userResources.put(tk.user, rur);
|
||||
}
|
||||
if (tk.appId == null) {
|
||||
rur.privateTrackerState = loadTrackerState(ts);
|
||||
|
@ -306,6 +337,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
}
|
||||
}
|
||||
result.it = new NMMemoryRecoveryIterator<Map.Entry<String, RecoveredUserResources>>(
|
||||
userResources.entrySet().iterator());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -341,8 +374,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
throws IOException {
|
||||
RecoveredDeletionServiceState result =
|
||||
new RecoveredDeletionServiceState();
|
||||
result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(
|
||||
deleteTasks.values());
|
||||
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
|
||||
new ArrayList<DeletionServiceDeleteTaskProto>(deleteTasks.values());
|
||||
result.it = new NMMemoryRecoveryIterator<DeletionServiceDeleteTaskProto>(
|
||||
deleteTaskProtos.iterator());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -365,9 +400,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
RecoveredNMTokensState result = new RecoveredNMTokensState();
|
||||
result.currentMasterKey = nmTokenState.currentMasterKey;
|
||||
result.previousMasterKey = nmTokenState.previousMasterKey;
|
||||
result.applicationMasterKeys =
|
||||
new HashMap<ApplicationAttemptId, MasterKey>(
|
||||
nmTokenState.applicationMasterKeys);
|
||||
Map<ApplicationAttemptId, MasterKey> masterKeysMap =
|
||||
new HashMap<ApplicationAttemptId, MasterKey>(applicationMasterKeys);
|
||||
result.it = new NMMemoryRecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>>(
|
||||
masterKeysMap.entrySet().iterator());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -389,14 +425,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
public synchronized void storeNMTokenApplicationMasterKey(
|
||||
ApplicationAttemptId attempt, MasterKey key) throws IOException {
|
||||
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
|
||||
nmTokenState.applicationMasterKeys.put(attempt,
|
||||
applicationMasterKeys.put(attempt,
|
||||
new MasterKeyPBImpl(keypb.getProto()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeNMTokenApplicationMasterKey(
|
||||
ApplicationAttemptId attempt) throws IOException {
|
||||
nmTokenState.applicationMasterKeys.remove(attempt);
|
||||
applicationMasterKeys.remove(attempt);
|
||||
}
|
||||
|
||||
|
||||
|
@ -408,8 +444,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
new RecoveredContainerTokensState();
|
||||
result.currentMasterKey = containerTokenState.currentMasterKey;
|
||||
result.previousMasterKey = containerTokenState.previousMasterKey;
|
||||
result.activeTokens =
|
||||
new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
|
||||
Map<ContainerId, Long> containersTokenMap =
|
||||
new HashMap<ContainerId, Long>(activeTokens);
|
||||
result.it = new NMMemoryRecoveryIterator<Map.Entry<ContainerId, Long>>(
|
||||
containersTokenMap.entrySet().iterator());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -432,13 +470,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
@Override
|
||||
public synchronized void storeContainerToken(ContainerId containerId,
|
||||
Long expirationTime) throws IOException {
|
||||
containerTokenState.activeTokens.put(containerId, expirationTime);
|
||||
activeTokens.put(containerId, expirationTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeContainerToken(ContainerId containerId)
|
||||
throws IOException {
|
||||
containerTokenState.activeTokens.remove(containerId);
|
||||
activeTokens.remove(containerId);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -125,6 +125,73 @@ public class TestNMLeveldbStateStoreService {
|
|||
FileUtil.fullyDelete(TMP_DIR);
|
||||
}
|
||||
|
||||
private List<RecoveredContainerState> loadContainersState(
|
||||
RecoveryIterator<RecoveredContainerState> it) throws IOException {
|
||||
List<RecoveredContainerState> containers =
|
||||
new ArrayList<RecoveredContainerState>();
|
||||
while (it.hasNext()) {
|
||||
RecoveredContainerState rcs = it.next();
|
||||
containers.add(rcs);
|
||||
}
|
||||
return containers;
|
||||
}
|
||||
|
||||
private List<ContainerManagerApplicationProto> loadApplicationProtos(
|
||||
RecoveryIterator<ContainerManagerApplicationProto> it)
|
||||
throws IOException {
|
||||
List<ContainerManagerApplicationProto> applicationProtos =
|
||||
new ArrayList<ContainerManagerApplicationProto>();
|
||||
while (it.hasNext()) {
|
||||
applicationProtos.add(it.next());
|
||||
}
|
||||
return applicationProtos;
|
||||
}
|
||||
|
||||
private List<DeletionServiceDeleteTaskProto> loadDeletionTaskProtos(
|
||||
RecoveryIterator<DeletionServiceDeleteTaskProto> it) throws IOException {
|
||||
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
|
||||
new ArrayList<DeletionServiceDeleteTaskProto>();
|
||||
while (it.hasNext()) {
|
||||
deleteTaskProtos.add(it.next());
|
||||
}
|
||||
return deleteTaskProtos;
|
||||
}
|
||||
|
||||
private Map<String, RecoveredUserResources> loadUserResources(
|
||||
RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it)
|
||||
throws IOException {
|
||||
Map<String, RecoveredUserResources> userResources =
|
||||
new HashMap<String, RecoveredUserResources>();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<String, RecoveredUserResources> entry = it.next();
|
||||
userResources.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return userResources;
|
||||
}
|
||||
|
||||
private Map<ApplicationAttemptId, MasterKey> loadNMTokens(
|
||||
RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it)
|
||||
throws IOException {
|
||||
Map<ApplicationAttemptId, MasterKey> nmTokens =
|
||||
new HashMap<ApplicationAttemptId, MasterKey>();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
|
||||
nmTokens.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return nmTokens;
|
||||
}
|
||||
|
||||
private Map<ContainerId, Long> loadContainerTokens(
|
||||
RecoveryIterator<Map.Entry<ContainerId, Long>> it) throws IOException {
|
||||
Map<ContainerId, Long> containerTokens =
|
||||
new HashMap<ContainerId, Long>();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<ContainerId, Long> entry = it.next();
|
||||
containerTokens.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return containerTokens;
|
||||
}
|
||||
|
||||
private void restartStateStore() throws IOException {
|
||||
// need to close so leveldb releases database lock
|
||||
if (stateStore != null) {
|
||||
|
@ -142,7 +209,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertNotNull(pubts);
|
||||
assertTrue(pubts.getLocalizedResources().isEmpty());
|
||||
assertTrue(pubts.getInProgressResources().isEmpty());
|
||||
assertTrue(state.getUserResources().isEmpty());
|
||||
assertTrue(loadUserResources(state.getIterator()).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -183,7 +250,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
restartStateStore();
|
||||
Assert.fail("Incompatible version, should expect fail here.");
|
||||
} catch (ServiceStateException e) {
|
||||
Assert.assertTrue("Exception message mismatch",
|
||||
Assert.assertTrue("Exception message mismatch",
|
||||
e.getMessage().contains("Incompatible version for NM state:"));
|
||||
}
|
||||
}
|
||||
|
@ -192,7 +259,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
public void testApplicationStorage() throws IOException {
|
||||
// test empty when no state
|
||||
RecoveredApplicationsState state = stateStore.loadApplicationsState();
|
||||
assertTrue(state.getApplications().isEmpty());
|
||||
List<ContainerManagerApplicationProto> apps =
|
||||
loadApplicationProtos(state.getIterator());
|
||||
assertTrue(apps.isEmpty());
|
||||
|
||||
// store an application and verify recovered
|
||||
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
|
||||
|
@ -204,8 +273,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeApplication(appId1, appProto1);
|
||||
restartStateStore();
|
||||
state = stateStore.loadApplicationsState();
|
||||
assertEquals(1, state.getApplications().size());
|
||||
assertEquals(appProto1, state.getApplications().get(0));
|
||||
apps = loadApplicationProtos(state.getIterator());
|
||||
assertEquals(1, apps.size());
|
||||
assertEquals(appProto1, apps.get(0));
|
||||
|
||||
// add a new app
|
||||
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
|
||||
|
@ -216,23 +286,25 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeApplication(appId2, appProto2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadApplicationsState();
|
||||
assertEquals(2, state.getApplications().size());
|
||||
assertTrue(state.getApplications().contains(appProto1));
|
||||
assertTrue(state.getApplications().contains(appProto2));
|
||||
apps = loadApplicationProtos(state.getIterator());
|
||||
assertEquals(2, apps.size());
|
||||
assertTrue(apps.contains(appProto1));
|
||||
assertTrue(apps.contains(appProto2));
|
||||
|
||||
// test removing an application
|
||||
stateStore.removeApplication(appId2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadApplicationsState();
|
||||
assertEquals(1, state.getApplications().size());
|
||||
assertEquals(appProto1, state.getApplications().get(0));
|
||||
apps = loadApplicationProtos(state.getIterator());
|
||||
assertEquals(1, apps.size());
|
||||
assertEquals(appProto1, apps.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerStorage() throws IOException {
|
||||
// test empty when no state
|
||||
List<RecoveredContainerState> recoveredContainers =
|
||||
stateStore.loadContainersState();
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertTrue(recoveredContainers.isEmpty());
|
||||
|
||||
// create a container request
|
||||
|
@ -254,7 +326,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.getContainerVersionKey(containerId.toString()))));
|
||||
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
RecoveredContainerState rcs = recoveredContainers.get(0);
|
||||
assertEquals(0, rcs.getVersion());
|
||||
|
@ -269,14 +342,16 @@ public class TestNMLeveldbStateStoreService {
|
|||
// store a new container record without StartContainerRequest
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
|
||||
stateStore.storeContainerLaunched(containerId1);
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
// check whether the new container record is discarded
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
|
||||
// queue the container, and verify recovered
|
||||
stateStore.storeContainerQueued(containerId);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
|
||||
|
@ -292,7 +367,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
diags.append("some diags for container");
|
||||
stateStore.storeContainerDiagnostics(containerId, diags);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
|
||||
|
@ -305,7 +381,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
// pause the container, and verify recovered
|
||||
stateStore.storeContainerPaused(containerId);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
|
||||
|
@ -316,7 +393,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
// Resume the container
|
||||
stateStore.removeContainerPaused(containerId);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
|
||||
// increase the container size, and verify recovered
|
||||
|
@ -328,7 +406,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore
|
||||
.storeContainerUpdateToken(containerId, updateTokenIdentifier);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(0, rcs.getVersion());
|
||||
|
@ -342,7 +421,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerDiagnostics(containerId, diags);
|
||||
stateStore.storeContainerKilled(containerId);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
|
||||
|
@ -358,7 +438,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerDiagnostics(containerId, diags);
|
||||
stateStore.storeContainerCompleted(containerId, 21);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
|
||||
|
@ -371,7 +452,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
|
||||
stateStore.storeContainerLogDir(containerId, "/test/logdir");
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(6, rcs.getRemainingRetryAttempts());
|
||||
|
@ -382,12 +464,13 @@ public class TestNMLeveldbStateStoreService {
|
|||
// remove the container and verify not recovered
|
||||
stateStore.removeContainer(containerId);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertTrue(recoveredContainers.isEmpty());
|
||||
// recover again to check remove clears all containers
|
||||
restartStateStore();
|
||||
NMStateStoreService nmStoreSpy = spy(stateStore);
|
||||
nmStoreSpy.loadContainersState();
|
||||
loadContainersState(nmStoreSpy.getContainerStateIterator());
|
||||
verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class));
|
||||
}
|
||||
|
||||
|
@ -399,7 +482,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerRestartTimes(containerId,
|
||||
finishTimeForRetryAttempts);
|
||||
restartStateStore();
|
||||
RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
|
||||
RecoveredContainerState rcs =
|
||||
loadContainersState(stateStore.getContainerStateIterator()).get(0);
|
||||
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
|
||||
assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
|
||||
assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
|
||||
|
@ -481,7 +565,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertTrue(pubts.getLocalizedResources().isEmpty());
|
||||
assertTrue(pubts.getInProgressResources().isEmpty());
|
||||
Map<String, RecoveredUserResources> userResources =
|
||||
state.getUserResources();
|
||||
loadUserResources(state.getIterator());
|
||||
assertEquals(1, userResources.size());
|
||||
RecoveredUserResources rur = userResources.get(user);
|
||||
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
|
||||
|
@ -535,7 +619,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
pubts.getInProgressResources().get(pubRsrcProto1));
|
||||
assertEquals(pubRsrcLocalPath2,
|
||||
pubts.getInProgressResources().get(pubRsrcProto2));
|
||||
userResources = state.getUserResources();
|
||||
userResources = loadUserResources(state.getIterator());
|
||||
assertEquals(1, userResources.size());
|
||||
rur = userResources.get(user);
|
||||
privts = rur.getPrivateTrackerState();
|
||||
|
@ -584,7 +668,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertTrue(pubts.getLocalizedResources().isEmpty());
|
||||
assertTrue(pubts.getInProgressResources().isEmpty());
|
||||
Map<String, RecoveredUserResources> userResources =
|
||||
state.getUserResources();
|
||||
loadUserResources(state.getIterator());
|
||||
assertEquals(1, userResources.size());
|
||||
RecoveredUserResources rur = userResources.get(user);
|
||||
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
|
||||
|
@ -654,7 +738,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertEquals(1, pubts.getInProgressResources().size());
|
||||
assertEquals(pubRsrcLocalPath2,
|
||||
pubts.getInProgressResources().get(pubRsrcProto2));
|
||||
userResources = state.getUserResources();
|
||||
userResources = loadUserResources(state.getIterator());
|
||||
assertEquals(1, userResources.size());
|
||||
rur = userResources.get(user);
|
||||
privts = rur.getPrivateTrackerState();
|
||||
|
@ -762,7 +846,7 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertEquals(pubLocalizedProto1,
|
||||
pubts.getLocalizedResources().iterator().next());
|
||||
Map<String, RecoveredUserResources> userResources =
|
||||
state.getUserResources();
|
||||
loadUserResources(state.getIterator());
|
||||
assertTrue(userResources.isEmpty());
|
||||
}
|
||||
|
||||
|
@ -771,7 +855,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
// test empty when no state
|
||||
RecoveredDeletionServiceState state =
|
||||
stateStore.loadDeletionServiceState();
|
||||
assertTrue(state.getTasks().isEmpty());
|
||||
List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
|
||||
loadDeletionTaskProtos(state.getIterator());
|
||||
assertTrue(deleteTaskProtos.isEmpty());
|
||||
|
||||
// store a deletion task and verify recovered
|
||||
DeletionServiceDeleteTaskProto proto =
|
||||
|
@ -788,8 +874,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeDeletionTask(proto.getId(), proto);
|
||||
restartStateStore();
|
||||
state = stateStore.loadDeletionServiceState();
|
||||
assertEquals(1, state.getTasks().size());
|
||||
assertEquals(proto, state.getTasks().get(0));
|
||||
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
|
||||
assertEquals(1, deleteTaskProtos.size());
|
||||
assertEquals(proto, deleteTaskProtos.get(0));
|
||||
|
||||
// store another deletion task
|
||||
DeletionServiceDeleteTaskProto proto2 =
|
||||
|
@ -802,31 +889,36 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeDeletionTask(proto2.getId(), proto2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadDeletionServiceState();
|
||||
assertEquals(2, state.getTasks().size());
|
||||
assertTrue(state.getTasks().contains(proto));
|
||||
assertTrue(state.getTasks().contains(proto2));
|
||||
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
|
||||
assertEquals(2, deleteTaskProtos.size());
|
||||
assertTrue(deleteTaskProtos.contains(proto));
|
||||
assertTrue(deleteTaskProtos.contains(proto2));
|
||||
|
||||
|
||||
// delete a task and verify gone after recovery
|
||||
stateStore.removeDeletionTask(proto2.getId());
|
||||
restartStateStore();
|
||||
state = stateStore.loadDeletionServiceState();
|
||||
assertEquals(1, state.getTasks().size());
|
||||
assertEquals(proto, state.getTasks().get(0));
|
||||
state = stateStore.loadDeletionServiceState();
|
||||
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
|
||||
assertEquals(1, deleteTaskProtos.size());
|
||||
assertEquals(proto, deleteTaskProtos.get(0));
|
||||
|
||||
// delete the last task and verify none left
|
||||
stateStore.removeDeletionTask(proto.getId());
|
||||
restartStateStore();
|
||||
state = stateStore.loadDeletionServiceState();
|
||||
assertTrue(state.getTasks().isEmpty());
|
||||
}
|
||||
deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
|
||||
assertTrue(deleteTaskProtos.isEmpty()); }
|
||||
|
||||
@Test
|
||||
public void testNMTokenStorage() throws IOException {
|
||||
// test empty when no state
|
||||
RecoveredNMTokensState state = stateStore.loadNMTokensState();
|
||||
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
|
||||
loadNMTokens(state.getIterator());
|
||||
assertNull(state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||
assertTrue(loadedAppKeys.isEmpty());
|
||||
|
||||
// store a master key and verify recovered
|
||||
NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
|
||||
|
@ -834,18 +926,20 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokensState();
|
||||
loadedAppKeys = loadNMTokens(state.getIterator());
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||
assertTrue(loadedAppKeys.isEmpty());
|
||||
|
||||
// store a previous key and verify recovered
|
||||
MasterKey prevKey = secretMgr.generateKey();
|
||||
stateStore.storeNMTokenPreviousMasterKey(prevKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokensState();
|
||||
loadedAppKeys = loadNMTokens(state.getIterator());
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
assertTrue(state.getApplicationMasterKeys().isEmpty());
|
||||
assertTrue(loadedAppKeys.isEmpty());
|
||||
|
||||
// store a few application keys and verify recovered
|
||||
ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
|
||||
|
@ -858,10 +952,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokensState();
|
||||
loadedAppKeys = loadNMTokens(state.getIterator());
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
|
||||
state.getApplicationMasterKeys();
|
||||
assertEquals(2, loadedAppKeys.size());
|
||||
assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
|
||||
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
|
||||
|
@ -880,9 +973,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeNMTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadNMTokensState();
|
||||
loadedAppKeys = loadNMTokens(state.getIterator());
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
loadedAppKeys = state.getApplicationMasterKeys();
|
||||
assertEquals(2, loadedAppKeys.size());
|
||||
assertNull(loadedAppKeys.get(attempt1));
|
||||
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
|
||||
|
@ -894,9 +987,10 @@ public class TestNMLeveldbStateStoreService {
|
|||
// test empty when no state
|
||||
RecoveredContainerTokensState state =
|
||||
stateStore.loadContainerTokensState();
|
||||
Map<ContainerId, Long> loadedActiveTokens = loadContainerTokens(state.it);
|
||||
assertNull(state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getActiveTokens().isEmpty());
|
||||
assertTrue(loadedActiveTokens.isEmpty());
|
||||
|
||||
// store a master key and verify recovered
|
||||
ContainerTokenKeyGeneratorForTest keygen =
|
||||
|
@ -905,18 +999,20 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
loadedActiveTokens = loadContainerTokens(state.it);
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertNull(state.getPreviousMasterKey());
|
||||
assertTrue(state.getActiveTokens().isEmpty());
|
||||
assertTrue(loadedActiveTokens.isEmpty());
|
||||
|
||||
// store a previous key and verify recovered
|
||||
MasterKey prevKey = keygen.generateKey();
|
||||
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
loadedActiveTokens = loadContainerTokens(state.it);
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
assertTrue(state.getActiveTokens().isEmpty());
|
||||
assertTrue(loadedActiveTokens.isEmpty());
|
||||
|
||||
// store a few container tokens and verify recovered
|
||||
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||
|
@ -927,10 +1023,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerToken(cid2, expTime2);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
loadedActiveTokens = loadContainerTokens(state.it);
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
Map<ContainerId, Long> loadedActiveTokens =
|
||||
state.getActiveTokens();
|
||||
assertEquals(2, loadedActiveTokens.size());
|
||||
assertEquals(expTime1, loadedActiveTokens.get(cid1));
|
||||
assertEquals(expTime2, loadedActiveTokens.get(cid2));
|
||||
|
@ -948,9 +1043,9 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
|
||||
restartStateStore();
|
||||
state = stateStore.loadContainerTokensState();
|
||||
loadedActiveTokens = loadContainerTokens(state.it);
|
||||
assertEquals(currentKey, state.getCurrentMasterKey());
|
||||
assertEquals(prevKey, state.getPreviousMasterKey());
|
||||
loadedActiveTokens = state.getActiveTokens();
|
||||
assertEquals(2, loadedActiveTokens.size());
|
||||
assertNull(loadedActiveTokens.get(cid1));
|
||||
assertEquals(expTime2, loadedActiveTokens.get(cid2));
|
||||
|
@ -1029,8 +1124,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
@Test
|
||||
public void testUnexpectedKeyDoesntThrowException() throws IOException {
|
||||
// test empty when no state
|
||||
List<RecoveredContainerState> recoveredContainers = stateStore
|
||||
.loadContainersState();
|
||||
List<RecoveredContainerState> recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertTrue(recoveredContainers.isEmpty());
|
||||
|
||||
ApplicationId appId = ApplicationId.newInstance(1234, 3);
|
||||
|
@ -1045,7 +1140,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
+ containerId.toString() + "/invalidKey1234").getBytes();
|
||||
stateStore.getDB().put(invalidKey, new byte[1]);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
RecoveredContainerState rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
|
||||
|
@ -1162,8 +1258,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
@Test
|
||||
public void testStateStoreForResourceMapping() throws IOException {
|
||||
// test empty when no state
|
||||
List<RecoveredContainerState> recoveredContainers = stateStore
|
||||
.loadContainersState();
|
||||
List<RecoveredContainerState> recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertTrue(recoveredContainers.isEmpty());
|
||||
|
||||
ApplicationId appId = ApplicationId.newInstance(1234, 3);
|
||||
|
@ -1190,7 +1286,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
|
||||
// add a invalid key
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
recoveredContainers =
|
||||
loadContainersState(stateStore.getContainerStateIterator());
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
RecoveredContainerState rcs = recoveredContainers.get(0);
|
||||
List<Serializable> res = rcs.getResourceMappings()
|
||||
|
@ -1253,7 +1350,8 @@ public class TestNMLeveldbStateStoreService {
|
|||
stateStore.storeContainerRestartTimes(containerId,
|
||||
restartTimes);
|
||||
restartStateStore();
|
||||
RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
|
||||
RecoveredContainerState rcs =
|
||||
loadContainersState(stateStore.getContainerStateIterator()).get(0);
|
||||
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
|
||||
assertTrue(recoveredRestartTimes.isEmpty());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue