YARN-5630. NM fails to start after downgrade from 2.8 to 2.7. Contributed by Jason Lowe
This commit is contained in:
parent
729de3e6b6
commit
e793309735
|
@ -277,21 +277,24 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
@Override
|
@Override
|
||||||
public void storeContainer(ContainerId containerId, int containerVersion,
|
public void storeContainer(ContainerId containerId, int containerVersion,
|
||||||
StartContainerRequest startRequest) throws IOException {
|
StartContainerRequest startRequest) throws IOException {
|
||||||
|
String idStr = containerId.toString();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("storeContainer: containerId= " + containerId
|
LOG.debug("storeContainer: containerId= " + idStr
|
||||||
+ ", startRequest= " + startRequest);
|
+ ", startRequest= " + startRequest);
|
||||||
}
|
}
|
||||||
String keyRequest = CONTAINERS_KEY_PREFIX + containerId.toString()
|
String keyRequest = CONTAINERS_KEY_PREFIX + idStr
|
||||||
+ CONTAINER_REQUEST_KEY_SUFFIX;
|
+ CONTAINER_REQUEST_KEY_SUFFIX;
|
||||||
String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString()
|
String keyVersion = getContainerVersionKey(idStr);
|
||||||
+ CONTAINER_VERSION_KEY_SUFFIX;
|
|
||||||
try {
|
try {
|
||||||
WriteBatch batch = db.createWriteBatch();
|
WriteBatch batch = db.createWriteBatch();
|
||||||
try {
|
try {
|
||||||
batch.put(bytes(keyRequest),
|
batch.put(bytes(keyRequest),
|
||||||
((StartContainerRequestPBImpl) startRequest)
|
((StartContainerRequestPBImpl) startRequest)
|
||||||
.getProto().toByteArray());
|
.getProto().toByteArray());
|
||||||
batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion)));
|
if (containerVersion != 0) {
|
||||||
|
batch.put(bytes(keyVersion),
|
||||||
|
bytes(Integer.toString(containerVersion)));
|
||||||
|
}
|
||||||
db.write(batch);
|
db.write(batch);
|
||||||
} finally {
|
} finally {
|
||||||
batch.close();
|
batch.close();
|
||||||
|
@ -301,6 +304,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
String getContainerVersionKey(String containerId) {
|
||||||
|
return CONTAINERS_KEY_PREFIX + containerId + CONTAINER_VERSION_KEY_SUFFIX;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeContainerQueued(ContainerId containerId) throws IOException {
|
public void storeContainerQueued(ContainerId containerId) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -1222,6 +1230,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
DB getDB() {
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
|
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
|
||||||
* 2) Any incompatible change of state-store is a major upgrade, and any
|
* 2) Any incompatible change of state-store is a major upgrade, and any
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
||||||
|
|
||||||
|
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -78,7 +79,6 @@ import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.iq80.leveldb.DB;
|
import org.iq80.leveldb.DB;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -226,48 +226,21 @@ public class TestNMLeveldbStateStoreService {
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
ApplicationAttemptId.newInstance(appId, 4);
|
ApplicationAttemptId.newInstance(appId, 4);
|
||||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
|
||||||
LocalResource lrsrc = LocalResource.newInstance(
|
StartContainerRequest containerReq = createContainerRequest(containerId);
|
||||||
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
|
||||||
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
|
||||||
1234567890L);
|
|
||||||
Map<String, LocalResource> localResources =
|
|
||||||
new HashMap<String, LocalResource>();
|
|
||||||
localResources.put("rsrc", lrsrc);
|
|
||||||
Map<String, String> env = new HashMap<String, String>();
|
|
||||||
env.put("somevar", "someval");
|
|
||||||
List<String> containerCmds = new ArrayList<String>();
|
|
||||||
containerCmds.add("somecmd");
|
|
||||||
containerCmds.add("somearg");
|
|
||||||
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
||||||
serviceData.put("someservice",
|
|
||||||
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
|
|
||||||
ByteBuffer containerTokens =
|
|
||||||
ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
|
|
||||||
Map<ApplicationAccessType, String> acls =
|
|
||||||
new HashMap<ApplicationAccessType, String>();
|
|
||||||
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
|
|
||||||
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
|
|
||||||
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
|
||||||
localResources, env, containerCmds, serviceData, containerTokens,
|
|
||||||
acls);
|
|
||||||
Resource containerRsrc = Resource.newInstance(1357, 3);
|
|
||||||
ContainerTokenIdentifier containerTokenId =
|
|
||||||
new ContainerTokenIdentifier(containerId, "host", "user",
|
|
||||||
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
|
|
||||||
13579);
|
|
||||||
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
|
|
||||||
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
|
|
||||||
"tokenservice");
|
|
||||||
StartContainerRequest containerReq =
|
|
||||||
StartContainerRequest.newInstance(clc, containerToken);
|
|
||||||
|
|
||||||
// store a container and verify recovered
|
// store a container and verify recovered
|
||||||
stateStore.storeContainer(containerId, 1, containerReq);
|
stateStore.storeContainer(containerId, 0, containerReq);
|
||||||
|
|
||||||
|
// verify the container version key is not stored for new containers
|
||||||
|
DB db = stateStore.getDB();
|
||||||
|
assertNull("version key present for new container", db.get(bytes(
|
||||||
|
stateStore.getContainerVersionKey(containerId.toString()))));
|
||||||
|
|
||||||
restartStateStore();
|
restartStateStore();
|
||||||
recoveredContainers = stateStore.loadContainersState();
|
recoveredContainers = stateStore.loadContainersState();
|
||||||
assertEquals(1, recoveredContainers.size());
|
assertEquals(1, recoveredContainers.size());
|
||||||
RecoveredContainerState rcs = recoveredContainers.get(0);
|
RecoveredContainerState rcs = recoveredContainers.get(0);
|
||||||
assertEquals(1, rcs.getVersion());
|
assertEquals(0, rcs.getVersion());
|
||||||
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
|
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
|
||||||
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
||||||
assertEquals(false, rcs.getKilled());
|
assertEquals(false, rcs.getKilled());
|
||||||
|
@ -368,6 +341,43 @@ public class TestNMLeveldbStateStoreService {
|
||||||
assertTrue(recoveredContainers.isEmpty());
|
assertTrue(recoveredContainers.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private StartContainerRequest createContainerRequest(
|
||||||
|
ContainerId containerId) {
|
||||||
|
LocalResource lrsrc = LocalResource.newInstance(
|
||||||
|
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
|
||||||
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
|
||||||
|
1234567890L);
|
||||||
|
Map<String, LocalResource> localResources =
|
||||||
|
new HashMap<String, LocalResource>();
|
||||||
|
localResources.put("rsrc", lrsrc);
|
||||||
|
Map<String, String> env = new HashMap<String, String>();
|
||||||
|
env.put("somevar", "someval");
|
||||||
|
List<String> containerCmds = new ArrayList<String>();
|
||||||
|
containerCmds.add("somecmd");
|
||||||
|
containerCmds.add("somearg");
|
||||||
|
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
||||||
|
serviceData.put("someservice",
|
||||||
|
ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
|
||||||
|
ByteBuffer containerTokens =
|
||||||
|
ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
|
||||||
|
Map<ApplicationAccessType, String> acls =
|
||||||
|
new HashMap<ApplicationAccessType, String>();
|
||||||
|
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
|
||||||
|
acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
|
||||||
|
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||||
|
localResources, env, containerCmds, serviceData, containerTokens,
|
||||||
|
acls);
|
||||||
|
Resource containerRsrc = Resource.newInstance(1357, 3);
|
||||||
|
ContainerTokenIdentifier containerTokenId =
|
||||||
|
new ContainerTokenIdentifier(containerId, "host", "user",
|
||||||
|
containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),
|
||||||
|
13579);
|
||||||
|
Token containerToken = Token.newInstance(containerTokenId.getBytes(),
|
||||||
|
ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
|
||||||
|
"tokenservice");
|
||||||
|
return StartContainerRequest.newInstance(clc, containerToken);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStartResourceLocalization() throws IOException {
|
public void testStartResourceLocalization() throws IOException {
|
||||||
String user = "somebody";
|
String user = "somebody";
|
||||||
|
|
Loading…
Reference in New Issue