diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a686a094917..8283d48d6ae 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED YARN-2298. Move TimelineClient to yarn-common project (Zhijie Shen via junping_du) + YARN-2288. Made persisted data in LevelDB timeline store be versioned. (Junping Du + via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java index 94957009fba..b0feac1671e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timeline; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; +import static org.fusesource.leveldbjni.JniDBFactory.bytes; import java.io.ByteArrayOutputStream; import java.io.File; @@ -60,8 +61,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEnt import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; import org.iq80.leveldb.ReadOptions; @@ -141,6 +146,11 @@ public class LeveldbTimelineStore extends AbstractService "z".getBytes(); private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version"; + + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 0); @Private @VisibleForTesting @@ -193,6 +203,7 @@ public class LeveldbTimelineStore extends AbstractService } LOG.info("Using leveldb path " + dbPath); db = factory.open(new File(dbPath.toString()), options); + checkVersion(); startTimeWriteCache = Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( conf))); @@ -1270,8 +1281,6 @@ public class LeveldbTimelineStore extends AbstractService DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); } - // warning is suppressed to prevent eclipse from noting unclosed resource - @SuppressWarnings("resource") @VisibleForTesting List getEntityTypes() throws IOException { DBIterator iterator = null; @@ -1489,4 +1498,65 @@ public class LeveldbTimelineStore extends AbstractService readOptions.fillCache(fillCache); return db.iterator(readOptions); } + + Version loadVersion() throws IOException { + byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return Version.newInstance(1, 0); + } + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); + return version; + } + + // Only used for test + @VisibleForTesting + void storeVersion(Version state) throws IOException { + dbStoreVersion(state); + } + + private void dbStoreVersion(Version state) throws IOException { + String key = TIMELINE_STORE_VERSION_KEY; + byte[] data = + ((VersionPBImpl) state).getProto().toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of TS-store is a major upgrade, and any + * compatible change of TS-store is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade timeline store or remove incompatible old state. + */ + private void checkVersion() throws IOException { + Version loadedVersion = loadVersion(); + LOG.info("Loaded timeline store version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing timeline store version info " + getCurrentVersion()); + dbStoreVersion(CURRENT_VERSION_INFO); + } else { + String incompatibleMessage = + "Incompatible version for timeline store: expecting version " + + getCurrentVersion() + ", but loading version " + loadedVersion; + LOG.fatal(incompatibleMessage); + throw new IOException(incompatibleMessage); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java index 2adfeaf8dba..0c6e082c340 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java @@ -36,14 +36,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.iq80.leveldb.DBIterator; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -52,19 +55,19 @@ import org.junit.Test; public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { private FileContext fsContext; private File fsPath; + private Configuration config = new YarnConfiguration(); @Before public void setup() throws Exception { fsContext = FileContext.getLocalFSFileContext(); - Configuration conf = new YarnConfiguration(); fsPath = new File("target", this.getClass().getSimpleName() + "-tmpDir").getAbsoluteFile(); fsContext.delete(new Path(fsPath.getAbsolutePath()), true); - conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, fsPath.getAbsolutePath()); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); store = new LeveldbTimelineStore(); - store.init(conf); + store.init(config); store.start(); loadTestData(); loadVerificationData(); @@ -263,5 +266,47 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { assertEquals(1, getEntities("type_2").size()); assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); } + + @Test + public void testCheckVersion() throws IOException { + LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store; + // default version + Version defaultVersion = dbStore.getCurrentVersion(); + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // compatible version + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + dbStore.storeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, dbStore.loadVersion()); + restartTimelineStore(); + dbStore = (LeveldbTimelineStore) store; + // overwrite the compatible version + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // incompatible version + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + dbStore.storeVersion(incompatibleVersion); + try { + restartTimelineStore(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for timeline store")); + } + } + + private void restartTimelineStore() throws IOException { + // need to close so leveldb releases database lock + if (store != null) { + store.close(); + } + store = new LeveldbTimelineStore(); + store.init(config); + store.start(); + } }