YARN-5547. NMLeveldbStateStore should be more tolerant of unknown keys. Contributed by Ajith S
This commit is contained in:
parent
26c4cfb653
commit
1672a06135
|
@ -143,6 +143,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
||||
|
@ -394,6 +395,12 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
context.getContainers().put(containerId, container);
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationContainerInitEvent(container));
|
||||
if (rcs.getRecoveryType() == RecoveredContainerType.KILL) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED,
|
||||
"Due to invalid StateStore info container was killed"
|
||||
+ " during recovery"));
|
||||
}
|
||||
} else {
|
||||
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
|
||||
LOG.warn(containerId + " has no corresponding application!");
|
||||
|
|
|
@ -70,6 +70,8 @@ import org.iq80.leveldb.Options;
|
|||
import org.iq80.leveldb.WriteBatch;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
|
||||
public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||
|
||||
|
@ -138,6 +140,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
private boolean isNewlyCreated;
|
||||
private Timer compactionTimer;
|
||||
|
||||
/**
|
||||
* Map of containerID vs List of unknown key suffixes.
|
||||
*/
|
||||
private ListMultimap<ContainerId, String> containerUnknownKeySuffixes =
|
||||
ArrayListMultimap.create();
|
||||
|
||||
public NMLeveldbStateStoreService() {
|
||||
super(NMLeveldbStateStoreService.class.getName());
|
||||
}
|
||||
|
@ -262,7 +270,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
|
||||
rcs.setLogDir(asString(entry.getValue()));
|
||||
} else {
|
||||
throw new IOException("Unexpected container state key: " + key);
|
||||
LOG.warn("the container " + containerId
|
||||
+ " will be killed because of the unknown key " + key
|
||||
+ " during recovery.");
|
||||
containerUnknownKeySuffixes.put(containerId, suffix);
|
||||
rcs.setRecoveryType(RecoveredContainerType.KILL);
|
||||
}
|
||||
}
|
||||
return rcs;
|
||||
|
@ -448,6 +460,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
|
||||
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
|
||||
List<String> unknownKeysForContainer =
|
||||
containerUnknownKeySuffixes.removeAll(containerId);
|
||||
for (String unknownKeySuffix : unknownKeysForContainer) {
|
||||
batch.delete(bytes(keyPrefix + unknownKeySuffix));
|
||||
}
|
||||
db.write(batch);
|
||||
} finally {
|
||||
batch.close();
|
||||
|
|
|
@ -60,6 +60,13 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Type of post recovery action.
|
||||
*/
|
||||
public enum RecoveredContainerType {
|
||||
KILL, RECOVER
|
||||
}
|
||||
|
||||
public enum RecoveredContainerStatus {
|
||||
REQUESTED,
|
||||
LAUNCHED,
|
||||
|
@ -77,6 +84,8 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
private String workDir;
|
||||
private String logDir;
|
||||
int version;
|
||||
private RecoveredContainerType recoveryType =
|
||||
RecoveredContainerType.RECOVER;
|
||||
|
||||
public RecoveredContainerStatus getStatus() {
|
||||
return status;
|
||||
|
@ -144,6 +153,14 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
.append(", LogDir: ").append(logDir)
|
||||
.toString();
|
||||
}
|
||||
|
||||
public RecoveredContainerType getRecoveryType() {
|
||||
return recoveryType;
|
||||
}
|
||||
|
||||
public void setRecoveryType(RecoveredContainerType recoveryType) {
|
||||
this.recoveryType = recoveryType;
|
||||
}
|
||||
}
|
||||
|
||||
public static class LocalResourceTrackerState {
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
|
|||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
|
||||
|
@ -935,6 +936,74 @@ public class TestNMLeveldbStateStoreService {
|
|||
store.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnexpectedKeyDoesntThrowException() throws IOException {
|
||||
// test empty when no state
|
||||
List<RecoveredContainerState> recoveredContainers = stateStore
|
||||
.loadContainersState();
|
||||
assertTrue(recoveredContainers.isEmpty());
|
||||
|
||||
// create a container request
|
||||
ApplicationId appId = ApplicationId.newInstance(1234, 3);
|
||||
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||
4);
|
||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
||||
LocalResource lrsrc = LocalResource.newInstance(
|
||||
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
||||
1234567890L);
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
localResources.put("rsrc", lrsrc);
|
||||
Map<String, String> env = new HashMap<String, String>();
|
||||
env.put("somevar", "someval");
|
||||
List<String> containerCmds = new ArrayList<String>();
|
||||
containerCmds.add("somecmd");
|
||||
containerCmds.add("somearg");
|
||||
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
||||
serviceData.put("someservice",
|
||||
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
|
||||
ByteBuffer containerTokens = ByteBuffer
|
||||
.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
|
||||
Map<ApplicationAccessType, String> acls =
|
||||
new HashMap<ApplicationAccessType, String>();
|
||||
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
|
||||
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
|
||||
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||
localResources, env, containerCmds,
|
||||
serviceData, containerTokens, acls);
|
||||
Resource containerRsrc = Resource.newInstance(1357, 3);
|
||||
ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
|
||||
containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
|
||||
Priority.newInstance(7), 13579);
|
||||
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
|
||||
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
|
||||
"tokenservice");
|
||||
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
|
||||
containerToken);
|
||||
|
||||
stateStore.storeContainer(containerId, 0, containerReq);
|
||||
|
||||
// add a invalid key
|
||||
byte[] invalidKey = ("ContainerManager/containers/"
|
||||
+ containerId.toString() + "/invalidKey1234").getBytes();
|
||||
stateStore.getDB().put(invalidKey, new byte[1]);
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
RecoveredContainerState rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
|
||||
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
||||
assertEquals(false, rcs.getKilled());
|
||||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
assertTrue(rcs.getDiagnostics().isEmpty());
|
||||
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
|
||||
// assert unknown keys are cleaned up finally
|
||||
assertNotNull(stateStore.getDB().get(invalidKey));
|
||||
stateStore.removeContainer(containerId);
|
||||
assertNull(stateStore.getDB().get(invalidKey));
|
||||
}
|
||||
|
||||
private static class NMTokenSecretManagerForTest extends
|
||||
BaseNMTokenSecretManager {
|
||||
public MasterKey generateKey() {
|
||||
|
|
Loading…
Reference in New Issue