YARN-2288. Made persisted data in LevelDB timeline store be versioned. Contributed by Junping Du.

svn merge --ignore-ancestry -c 1616540 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616541 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-08-07 17:03:58 +00:00
parent 5a056789ad
commit c6c4a74bef
3 changed files with 124 additions and 6 deletions

View File

@ -73,6 +73,9 @@ Release 2.6.0 - UNRELEASED
YARN-2298. Move TimelineClient to yarn-common project (Zhijie Shen via YARN-2298. Move TimelineClient to yarn-common project (Zhijie Shen via
junping_du) junping_du)
YARN-2288. Made persisted data in LevelDB timeline store be versioned. (Junping Du
via zjshen)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timeline;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
@ -60,8 +61,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEnt
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB; import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options; import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions; import org.iq80.leveldb.ReadOptions;
@ -142,6 +147,11 @@ public class LeveldbTimelineStore extends AbstractService
private static final byte[] EMPTY_BYTES = new byte[0]; private static final byte[] EMPTY_BYTES = new byte[0];
private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
private static final Version CURRENT_VERSION_INFO = Version
.newInstance(1, 0);
@Private @Private
@VisibleForTesting @VisibleForTesting
static final FsPermission LEVELDB_DIR_UMASK = FsPermission static final FsPermission LEVELDB_DIR_UMASK = FsPermission
@ -193,6 +203,7 @@ public class LeveldbTimelineStore extends AbstractService
} }
LOG.info("Using leveldb path " + dbPath); LOG.info("Using leveldb path " + dbPath);
db = factory.open(new File(dbPath.toString()), options); db = factory.open(new File(dbPath.toString()), options);
checkVersion();
startTimeWriteCache = startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
conf))); conf)));
@ -1270,8 +1281,6 @@ public class LeveldbTimelineStore extends AbstractService
DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
} }
// warning is suppressed to prevent eclipse from noting unclosed resource
@SuppressWarnings("resource")
@VisibleForTesting @VisibleForTesting
List<String> getEntityTypes() throws IOException { List<String> getEntityTypes() throws IOException {
DBIterator iterator = null; DBIterator iterator = null;
@ -1489,4 +1498,65 @@ public class LeveldbTimelineStore extends AbstractService
readOptions.fillCache(fillCache); readOptions.fillCache(fillCache);
return db.iterator(readOptions); return db.iterator(readOptions);
} }
Version loadVersion() throws IOException {
byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
// if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) {
return Version.newInstance(1, 0);
}
Version version =
new VersionPBImpl(VersionProto.parseFrom(data));
return version;
}
// Only used for test
@VisibleForTesting
void storeVersion(Version state) throws IOException {
dbStoreVersion(state);
}
private void dbStoreVersion(Version state) throws IOException {
String key = TIMELINE_STORE_VERSION_KEY;
byte[] data =
((VersionPBImpl) state).getProto().toByteArray();
try {
db.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e);
}
}
Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of TS-store is a major upgrade, and any
* compatible change of TS-store is a minor upgrade.
* 3) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade timeline store or remove incompatible old state.
*/
private void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded timeline store version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing timeline store version info " + getCurrentVersion());
dbStoreVersion(CURRENT_VERSION_INFO);
} else {
String incompatibleMessage =
"Incompatible version for timeline store: expecting version "
+ getCurrentVersion() + ", but loading version " + loadedVersion;
LOG.fatal(incompatibleMessage);
throw new IOException(incompatibleMessage);
}
}
} }

View File

@ -36,14 +36,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.DBIterator;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -52,19 +55,19 @@ import org.junit.Test;
public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
private FileContext fsContext; private FileContext fsContext;
private File fsPath; private File fsPath;
private Configuration config = new YarnConfiguration();
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
fsContext = FileContext.getLocalFSFileContext(); fsContext = FileContext.getLocalFSFileContext();
Configuration conf = new YarnConfiguration();
fsPath = new File("target", this.getClass().getSimpleName() + fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile(); "-tmpDir").getAbsoluteFile();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true); fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath()); fsPath.getAbsolutePath());
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
store = new LeveldbTimelineStore(); store = new LeveldbTimelineStore();
store.init(conf); store.init(config);
store.start(); store.start();
loadTestData(); loadTestData();
loadVerificationData(); loadVerificationData();
@ -264,4 +267,46 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
} }
@Test
public void testCheckVersion() throws IOException {
LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
// default version
Version defaultVersion = dbStore.getCurrentVersion();
Assert.assertEquals(defaultVersion, dbStore.loadVersion());
// compatible version
Version compatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
dbStore.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
restartTimelineStore();
dbStore = (LeveldbTimelineStore) store;
// overwrite the compatible version
Assert.assertEquals(defaultVersion, dbStore.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
dbStore.storeVersion(incompatibleVersion);
try {
restartTimelineStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for timeline store"));
}
}
private void restartTimelineStore() throws IOException {
// need to close so leveldb releases database lock
if (store != null) {
store.close();
}
store = new LeveldbTimelineStore();
store.init(config);
store.start();
}
} }