YARN-2816. NM fail to start with NPE during container recovery. Contributed by Zhihai Xu
(cherry picked from commit49c38898b0
) (cherry picked from commitad140d1fc8
) (cherry picked from commit 85b23c323c80c5303bd0b7bdb066258792ca67d8)
This commit is contained in:
parent
81ba30211e
commit
f83d898944
|
@ -15,6 +15,9 @@ Release 2.6.1 - UNRELEASED
|
|||
YARN-2856. Fixed RMAppImpl to handle ATTEMPT_KILLED event at ACCEPTED state
|
||||
on app recovery. (Rohith Sharmaks via jianhe)
|
||||
|
||||
YARN-2816. NM fail to start with NPE during container recovery (Zhihai Xu
|
||||
via jlowe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -146,6 +146,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
throws IOException {
|
||||
ArrayList<RecoveredContainerState> containers =
|
||||
new ArrayList<RecoveredContainerState>();
|
||||
ArrayList<ContainerId> containersToRemove =
|
||||
new ArrayList<ContainerId>();
|
||||
LeveldbIterator iter = null;
|
||||
try {
|
||||
iter = new LeveldbIterator(db);
|
||||
|
@ -165,7 +167,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
ContainerId containerId = ConverterUtils.toContainerId(
|
||||
key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
|
||||
String keyPrefix = key.substring(0, idEndPos+1);
|
||||
containers.add(loadContainerState(containerId, iter, keyPrefix));
|
||||
RecoveredContainerState rcs = loadContainerState(containerId,
|
||||
iter, keyPrefix);
|
||||
// Don't load container without StartContainerRequest
|
||||
if (rcs.startRequest != null) {
|
||||
containers.add(rcs);
|
||||
} else {
|
||||
containersToRemove.add(containerId);
|
||||
}
|
||||
}
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
|
@ -175,6 +184,19 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
}
|
||||
|
||||
// remove container without StartContainerRequest
|
||||
for (ContainerId containerId : containersToRemove) {
|
||||
LOG.warn("Remove container " + containerId +
|
||||
" with incomplete records");
|
||||
try {
|
||||
removeContainer(containerId);
|
||||
// TODO: kill and cleanup the leaked container
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to remove container " + containerId +
|
||||
" in store", e);
|
||||
}
|
||||
}
|
||||
|
||||
return containers;
|
||||
}
|
||||
|
||||
|
|
|
@ -274,6 +274,13 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
assertTrue(rcs.getDiagnostics().isEmpty());
|
||||
|
||||
// store a new container record without StartContainerRequest
|
||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
|
||||
stateStore.storeContainerLaunched(containerId1);
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
// check whether the new container record is discarded
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
|
||||
// launch the container, add some diagnostics, and verify recovered
|
||||
StringBuilder diags = new StringBuilder();
|
||||
stateStore.storeContainerLaunched(containerId);
|
||||
|
|
Loading…
Reference in New Issue