From 6dfdcf094d196fbf376129ec31a889632c3a30ae Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 19 Dec 2014 16:56:30 -0800 Subject: [PATCH] YARN-2952. Fixed incorrect version check in StateStore. Contributed by Rohith Sharmaks (cherry picked from commit 808cba3821d5bc4267f69d14220757f01cd55715) (cherry picked from commit 9180d11b3bbb2a49127d5d25f53b38c5113bf7ea) --- .../apache/hadoop/mapred/ShuffleHandler.java | 4 +- hadoop-yarn-project/CHANGES.txt | 3 ++ .../server/timeline/LeveldbTimelineStore.java | 4 +- .../recovery/NMLeveldbStateStoreService.java | 4 +- .../recovery/RMStateStore.java | 6 +-- .../recovery/TestFSRMStateStore.java | 54 +++++++++++++++++++ .../recovery/TestZKRMStateStore.java | 47 ++++++++++++++++ 7 files changed, 113 insertions(+), 9 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 33220c6cbdb..6e069f19263 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -493,9 +493,9 @@ public class ShuffleHandler extends AuxiliaryService { @VisibleForTesting Version loadVersion() throws IOException { byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY)); - // if version is not stored previously, treat it as 1.0. + // if version is not stored previously, treat it as CURRENT_VERSION_INFO. if (data == null || data.length == 0) { - return Version.newInstance(1, 0); + return getCurrentVersion(); } Version version = new VersionPBImpl(VersionProto.parseFrom(data)); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0cbbd95f30c..02c9bc41227 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -51,6 +51,9 @@ Release 2.6.1 - UNRELEASED YARN-1984. LeveldbTimelineStore does not handle db exceptions properly (Varun Saxena via jlowe) + YARN-2952. Fixed incorrect version check in StateStore. (Rohith Sharmaks + via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java index 33deb80453b..b87c82103a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java @@ -1610,9 +1610,9 @@ public class LeveldbTimelineStore extends AbstractService Version loadVersion() throws IOException { try { byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY)); - // if version is not stored previously, treat it as 1.0. + // if version is not stored previously, treat it as CURRENT_VERSION_INFO. if (data == null || data.length == 0) { - return Version.newInstance(1, 0); + return getCurrentVersion(); } Version version = new VersionPBImpl(VersionProto.parseFrom(data)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 9d5468845da..5f349dbe359 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -907,9 +907,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { Version loadVersion() throws IOException { byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY)); - // if version is not stored previously, treat it as 1.0. + // if version is not stored previously, treat it as CURRENT_VERSION_INFO. if (data == null || data.length == 0) { - return Version.newInstance(1, 0); + return getCurrentVersion(); } Version version = new VersionPBImpl(VersionProto.parseFrom(data)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index beac4032e2f..983cc81b022 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -507,7 +507,7 @@ public abstract class RMStateStore extends AbstractService { * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. * 2) Any incompatible change of state-store is a major upgrade, and any * compatible change of state-store is a minor upgrade. - * 3) If theres's no version, treat it as 1.0. + * 3) If theres's no version, treat it as CURRENT_VERSION_INFO. * 4) Within a minor upgrade, say 1.1 to 1.2: * overwrite the version info and proceed as normal. * 5) Within a major upgrade, say 1.2 to 2.0: @@ -520,9 +520,9 @@ public abstract class RMStateStore extends AbstractService { if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { return; } - // if there is no version info, treat it as 1.0; + // if there is no version info, treat it as CURRENT_VERSION_INFO; if (loadedVersion == null) { - loadedVersion = Version.newInstance(1, 0); + loadedVersion = getCurrentVersion(); } if (loadedVersion.isCompatibleTo(getCurrentVersion())) { LOG.info("Storing RM state version info " + getCurrentVersion()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 88e5393f14d..d0d19e31031 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStore.TestZKRMStateStoreTester; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -166,6 +167,59 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { } } + @Test(timeout = 60000) + public void testCheckMajorVersionChange() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try { + fsTester = new TestFSRMStateStoreTester(cluster) { + Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0); + + @Override + public Version getCurrentVersion() throws Exception { + return VERSION_INFO; + } + + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, + workingDirPathURI.toString()); + conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, + "100,6000"); + this.store = new TestFileSystemRMStore(conf) { + Version storedVersion = null; + + @Override + public Version getCurrentVersion() { + return VERSION_INFO; + } + + @Override + protected synchronized Version loadVersion() throws Exception { + return storedVersion; + } + + @Override + protected synchronized void storeVersion() throws Exception { + storedVersion = VERSION_INFO; + } + }; + return store; + } + }; + + // default version + RMStateStore store = fsTester.getRMStateStore(); + Version defaultVersion = fsTester.getCurrentVersion(); + store.checkVersion(); + Assert.assertEquals(defaultVersion, store.loadVersion()); + } finally { + cluster.shutdown(); + } + } + @Override protected void modifyAppState() throws Exception { // imitate appAttemptFile1 is still .new, but old one is deleted diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 3c7170adaeb..6af0edd801f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; +import org.junit.Assert; import org.junit.Test; public class TestZKRMStateStore extends RMStateStoreTestBase { @@ -126,6 +127,52 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { testAMRMTokenSecretManagerStateStore(zkTester); } + @Test (timeout = 60000) + public void testCheckMajorVersionChange() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() { + Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0); + + @Override + public Version getCurrentVersion() throws Exception { + return VERSION_INFO; + } + + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + workingZnode = "/Test"; + conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); + this.client = createClient(); + this.store = new TestZKRMStateStoreInternal(conf, workingZnode) { + Version storedVersion = null; + + @Override + public Version getCurrentVersion() { + return VERSION_INFO; + } + + @Override + protected synchronized Version loadVersion() throws Exception { + return storedVersion; + } + + @Override + protected synchronized void storeVersion() throws Exception { + storedVersion = VERSION_INFO; + } + }; + return this.store; + } + + }; + // default version + RMStateStore store = zkTester.getRMStateStore(); + Version defaultVersion = zkTester.getCurrentVersion(); + store.checkVersion(); + Assert.assertEquals(defaultVersion, store.loadVersion()); + } + private Configuration createHARMConf( String rmIds, String rmId, int adminPort) { Configuration conf = new YarnConfiguration();