YARN-2952. Fixed incorrect version check in StateStore. Contributed by Rohith Sharmaks

This commit is contained in:
Jian He 2014-12-19 16:56:30 -08:00
parent 954fb8581e
commit 808cba3821
7 changed files with 113 additions and 9 deletions

View File

@ -493,9 +493,9 @@ public class ShuffleHandler extends AuxiliaryService {
@VisibleForTesting @VisibleForTesting
Version loadVersion() throws IOException { Version loadVersion() throws IOException {
byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY)); byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0. // if version is not stored previously, treat it as CURRENT_VERSION_INFO.
if (data == null || data.length == 0) { if (data == null || data.length == 0) {
return Version.newInstance(1, 0); return getCurrentVersion();
} }
Version version = Version version =
new VersionPBImpl(VersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));

View File

@ -253,6 +253,9 @@ Release 2.7.0 - UNRELEASED
YARN-2675. containersKilled metrics is not updated when the container is killed YARN-2675. containersKilled metrics is not updated when the container is killed
during localization. (Zhihai Xu via kasha) during localization. (Zhihai Xu via kasha)
YARN-2952. Fixed incorrect version check in StateStore. (Rohith Sharmaks
via jianhe)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1637,9 +1637,9 @@ public class LeveldbTimelineStore extends AbstractService
Version loadVersion() throws IOException { Version loadVersion() throws IOException {
try { try {
byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY)); byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
// if version is not stored previously, treat it as 1.0. // if version is not stored previously, treat it as CURRENT_VERSION_INFO.
if (data == null || data.length == 0) { if (data == null || data.length == 0) {
return Version.newInstance(1, 0); return getCurrentVersion();
} }
Version version = Version version =
new VersionPBImpl(VersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));

View File

@ -907,9 +907,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
Version loadVersion() throws IOException { Version loadVersion() throws IOException {
byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY)); byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0. // if version is not stored previously, treat it as CURRENT_VERSION_INFO.
if (data == null || data.length == 0) { if (data == null || data.length == 0) {
return Version.newInstance(1, 0); return getCurrentVersion();
} }
Version version = Version version =
new VersionPBImpl(VersionProto.parseFrom(data)); new VersionPBImpl(VersionProto.parseFrom(data));

View File

@ -365,7 +365,7 @@ public abstract class RMStateStore extends AbstractService {
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of state-store is a major upgrade, and any * 2) Any incompatible change of state-store is a major upgrade, and any
* compatible change of state-store is a minor upgrade. * compatible change of state-store is a minor upgrade.
* 3) If theres's no version, treat it as 1.0. * 3) If theres's no version, treat it as CURRENT_VERSION_INFO.
* 4) Within a minor upgrade, say 1.1 to 1.2: * 4) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal. * overwrite the version info and proceed as normal.
* 5) Within a major upgrade, say 1.2 to 2.0: * 5) Within a major upgrade, say 1.2 to 2.0:
@ -378,9 +378,9 @@ public abstract class RMStateStore extends AbstractService {
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
return; return;
} }
// if there is no version info, treat it as 1.0; // if there is no version info, treat it as CURRENT_VERSION_INFO;
if (loadedVersion == null) { if (loadedVersion == null) {
loadedVersion = Version.newInstance(1, 0); loadedVersion = getCurrentVersion();
} }
if (loadedVersion.isCompatibleTo(getCurrentVersion())) { if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing RM state version info " + getCurrentVersion()); LOG.info("Storing RM state version info " + getCurrentVersion());

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStore.TestZKRMStateStoreTester;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -166,6 +167,59 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
} }
} }
@Test(timeout = 60000)
public void testCheckMajorVersionChange() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
fsTester = new TestFSRMStateStoreTester(cluster) {
Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
@Override
public Version getCurrentVersion() throws Exception {
return VERSION_INFO;
}
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
"100,6000");
this.store = new TestFileSystemRMStore(conf) {
Version storedVersion = null;
@Override
public Version getCurrentVersion() {
return VERSION_INFO;
}
@Override
protected synchronized Version loadVersion() throws Exception {
return storedVersion;
}
@Override
protected synchronized void storeVersion() throws Exception {
storedVersion = VERSION_INFO;
}
};
return store;
}
};
// default version
RMStateStore store = fsTester.getRMStateStore();
Version defaultVersion = fsTester.getCurrentVersion();
store.checkVersion();
Assert.assertEquals(defaultVersion, store.loadVersion());
} finally {
cluster.shutdown();
}
}
@Override @Override
protected void modifyAppState() throws Exception { protected void modifyAppState() throws Exception {
// imitate appAttemptFile1 is still .new, but old one is deleted // imitate appAttemptFile1 is still .new, but old one is deleted

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase { public class TestZKRMStateStore extends RMStateStoreTestBase {
@ -144,6 +145,52 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
testAMRMTokenSecretManagerStateStore(zkTester); testAMRMTokenSecretManagerStateStore(zkTester);
} }
@Test (timeout = 60000)
public void testCheckMajorVersionChange() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {
Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
@Override
public Version getCurrentVersion() throws Exception {
return VERSION_INFO;
}
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/Test";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
Version storedVersion = null;
@Override
public Version getCurrentVersion() {
return VERSION_INFO;
}
@Override
protected synchronized Version loadVersion() throws Exception {
return storedVersion;
}
@Override
protected synchronized void storeVersion() throws Exception {
storedVersion = VERSION_INFO;
}
};
return this.store;
}
};
// default version
RMStateStore store = zkTester.getRMStateStore();
Version defaultVersion = zkTester.getCurrentVersion();
store.checkVersion();
Assert.assertEquals(defaultVersion, store.loadVersion());
}
private Configuration createHARMConf( private Configuration createHARMConf(
String rmIds, String rmId, int adminPort) { String rmIds, String rmId, int adminPort) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();