YARN-1730. Implemented simple write-locking in the LevelDB based timeline-store. Contributed by Billie Rinaldi.

svn merge --ignore-ancestry -c 1574145 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1574146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-04 17:34:01 +00:00
parent 91d8ee4be7
commit f2d48add16
4 changed files with 207 additions and 51 deletions

View File

@ -234,6 +234,9 @@ Release 2.4.0 - UNRELEASED
YARN-1765. Added test cases to verify that killApplication API works across YARN-1765. Added test cases to verify that killApplication API works across
ResourceManager failover. (Xuan Gong via vinodkv) ResourceManager failover. (Xuan Gong via vinodkv)
YARN-1730. Implemented simple write-locking in the LevelDB based timeline-
store. (Billie Rinaldi via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -1073,9 +1073,22 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_STORE = public static final String TIMELINE_SERVICE_STORE =
TIMELINE_SERVICE_PREFIX + "store-class"; TIMELINE_SERVICE_PREFIX + "store-class";
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
/** Timeline service leveldb path */ /** Timeline service leveldb path */
public static final String TIMELINE_SERVICE_LEVELDB_PATH = public static final String TIMELINE_SERVICE_LEVELDB_PATH =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.path"; TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
/** Timeline service leveldb start time read cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size";
/** Timeline service leveldb start time write cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size";
//////////////////////////////// ////////////////////////////////
// Other Configs // Other Configs

View File

@ -33,6 +33,7 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.map.LRUMap; import org.apache.commons.collections.map.LRUMap;
@ -84,11 +85,17 @@ public class LeveldbTimelineStore extends AbstractService
private static final byte[] EMPTY_BYTES = new byte[0]; private static final byte[] EMPTY_BYTES = new byte[0];
private static final int START_TIME_CACHE_SIZE = 10000; private static final int DEFAULT_START_TIME_READ_CACHE_SIZE = 10000;
private static final int DEFAULT_START_TIME_WRITE_CACHE_SIZE = 10000;
@SuppressWarnings("unchecked") private Map<EntityIdentifier, Long> startTimeWriteCache;
private final Map<EntityIdentifier, Long> startTimeCache = private Map<EntityIdentifier, Long> startTimeReadCache;
Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
/**
* Per-entity locks are obtained when writing.
*/
private final LockMap<EntityIdentifier> writeLocks =
new LockMap<EntityIdentifier>();
private DB db; private DB db;
@ -97,6 +104,7 @@ public class LeveldbTimelineStore extends AbstractService
} }
@Override @Override
@SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
Options options = new Options(); Options options = new Options();
options.createIfMissing(true); options.createIfMissing(true);
@ -109,6 +117,12 @@ public class LeveldbTimelineStore extends AbstractService
"timeline store " + path); "timeline store " + path);
LOG.info("Using leveldb path " + path); LOG.info("Using leveldb path " + path);
db = factory.open(new File(path, FILENAME), options); db = factory.open(new File(path, FILENAME), options);
startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
conf)));
startTimeReadCache =
Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
conf)));
super.serviceInit(conf); super.serviceInit(conf);
} }
@ -118,6 +132,45 @@ public class LeveldbTimelineStore extends AbstractService
super.serviceStop(); super.serviceStop();
} }
private static class LockMap<K> {
private static class CountingReentrantLock<K> extends ReentrantLock {
private int count;
private K key;
CountingReentrantLock(K key) {
super();
this.count = 0;
this.key = key;
}
}
private Map<K, CountingReentrantLock<K>> locks =
new HashMap<K, CountingReentrantLock<K>>();
synchronized CountingReentrantLock<K> getLock(K key) {
CountingReentrantLock<K> lock = locks.get(key);
if (lock == null) {
lock = new CountingReentrantLock<K>(key);
locks.put(key, lock);
}
lock.count++;
return lock;
}
synchronized void returnLock(CountingReentrantLock<K> lock) {
if (lock.count == 0) {
throw new IllegalStateException("Returned lock more times than it " +
"was retrieved");
}
lock.count--;
if (lock.count == 0) {
locks.remove(lock.key);
}
}
}
private static class KeyBuilder { private static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b; private byte[][] b;
@ -214,7 +267,7 @@ public class LeveldbTimelineStore extends AbstractService
EnumSet<Field> fields) throws IOException { EnumSet<Field> fields) throws IOException {
DBIterator iterator = null; DBIterator iterator = null;
try { try {
byte[] revStartTime = getStartTime(entityId, entityType, null, null, null); byte[] revStartTime = getStartTime(entityId, entityType);
if (revStartTime == null) if (revStartTime == null)
return null; return null;
byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
@ -338,7 +391,7 @@ public class LeveldbTimelineStore extends AbstractService
// look up start times for the specified entities // look up start times for the specified entities
// skip entities with no start time // skip entities with no start time
for (String entity : entityIds) { for (String entity : entityIds) {
byte[] startTime = getStartTime(entity, entityType, null, null, null); byte[] startTime = getStartTime(entity, entityType);
if (startTime != null) { if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime); List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) { if (entities == null) {
@ -529,12 +582,16 @@ public class LeveldbTimelineStore extends AbstractService
* response. * response.
*/ */
private void put(TimelineEntity entity, TimelinePutResponse response) { private void put(TimelineEntity entity, TimelinePutResponse response) {
LockMap.CountingReentrantLock<EntityIdentifier> lock =
writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
entity.getEntityType()));
lock.lock();
WriteBatch writeBatch = null; WriteBatch writeBatch = null;
try { try {
writeBatch = db.createWriteBatch(); writeBatch = db.createWriteBatch();
List<TimelineEvent> events = entity.getEvents(); List<TimelineEvent> events = entity.getEvents();
// look up the start time for the entity // look up the start time for the entity
byte[] revStartTime = getStartTime(entity.getEntityId(), byte[] revStartTime = getAndSetStartTime(entity.getEntityId(),
entity.getEntityType(), entity.getStartTime(), events, entity.getEntityType(), entity.getStartTime(), events,
writeBatch); writeBatch);
if (revStartTime == null) { if (revStartTime == null) {
@ -571,7 +628,7 @@ public class LeveldbTimelineStore extends AbstractService
String relatedEntityType = relatedEntityList.getKey(); String relatedEntityType = relatedEntityList.getKey();
for (String relatedEntityId : relatedEntityList.getValue()) { for (String relatedEntityId : relatedEntityList.getValue()) {
// look up start time of related entity // look up start time of related entity
byte[] relatedEntityStartTime = getStartTime(relatedEntityId, byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId,
relatedEntityType, null, null, writeBatch); relatedEntityType, null, null, writeBatch);
if (relatedEntityStartTime == null) { if (relatedEntityStartTime == null) {
// if start time is not found, set start time of the related // if start time is not found, set start time of the related
@ -580,7 +637,7 @@ public class LeveldbTimelineStore extends AbstractService
relatedEntityStartTime = revStartTime; relatedEntityStartTime = revStartTime;
writeBatch.put(createStartTimeLookupKey(relatedEntityId, writeBatch.put(createStartTimeLookupKey(relatedEntityId,
relatedEntityType), relatedEntityStartTime); relatedEntityType), relatedEntityStartTime);
startTimeCache.put(new EntityIdentifier(relatedEntityId, startTimeWriteCache.put(new EntityIdentifier(relatedEntityId,
relatedEntityType), revStartTimeLong); relatedEntityType), revStartTimeLong);
} }
// write reverse entry (related entity -> entity) // write reverse entry (related entity -> entity)
@ -629,6 +686,8 @@ public class LeveldbTimelineStore extends AbstractService
error.setErrorCode(TimelinePutError.IO_EXCEPTION); error.setErrorCode(TimelinePutError.IO_EXCEPTION);
response.addError(error); response.addError(error);
} finally { } finally {
lock.unlock();
writeLocks.returnLock(lock);
IOUtils.cleanup(LOG, writeBatch); IOUtils.cleanup(LOG, writeBatch);
} }
} }
@ -666,6 +725,39 @@ public class LeveldbTimelineStore extends AbstractService
* *
* @param entityId The id of the entity * @param entityId The id of the entity
* @param entityType The type of the entity * @param entityType The type of the entity
* @return A byte array
* @throws IOException
*/
private byte[] getStartTime(String entityId, String entityType)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
// start time is not provided, so try to look it up
if (startTimeReadCache.containsKey(entity)) {
// found the start time in the cache
return writeReverseOrderedLong(startTimeReadCache.get(entity));
} else {
// try to look up the start time in the db
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
byte[] v = db.get(b);
if (v == null) {
// did not find the start time in the db
return null;
} else {
// found the start time in the db
startTimeReadCache.put(entity, readReverseOrderedLong(v, 0));
return v;
}
}
}
/**
* Get the unique start time for a given entity as a byte array that sorts
* the timestamps in reverse order (see {@link
* GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
* doesn't exist, set it based on the information provided.
*
* @param entityId The id of the entity
* @param entityType The type of the entity
* @param startTime The start time of the entity, or null * @param startTime The start time of the entity, or null
* @param events A list of events for the entity, or null * @param events A list of events for the entity, or null
* @param writeBatch A leveldb write batch, if the method is called by a * @param writeBatch A leveldb write batch, if the method is called by a
@ -673,62 +765,76 @@ public class LeveldbTimelineStore extends AbstractService
* @return A byte array * @return A byte array
* @throws IOException * @throws IOException
*/ */
private byte[] getStartTime(String entityId, String entityType, private byte[] getAndSetStartTime(String entityId, String entityType,
Long startTime, List<TimelineEvent> events, WriteBatch writeBatch) Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
throws IOException { throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType); EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
if (startTime == null) { if (startTime == null) {
// start time is not provided, so try to look it up // start time is not provided, so try to look it up
if (startTimeCache.containsKey(entity)) { if (startTimeWriteCache.containsKey(entity)) {
// found the start time in the cache // found the start time in the cache
startTime = startTimeCache.get(entity); startTime = startTimeWriteCache.get(entity);
return writeReverseOrderedLong(startTime);
} else { } else {
// try to look up the start time in the db if (events != null) {
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); // prepare a start time from events in case it is needed
byte[] v = db.get(b);
if (v == null) {
// did not find the start time in the db
// if this is a put, try to set it from the provided events
if (events == null || writeBatch == null) {
// no events, or not a put, so return null
return null;
}
Long min = Long.MAX_VALUE; Long min = Long.MAX_VALUE;
for (TimelineEvent e : events) for (TimelineEvent e : events) {
if (min > e.getTimestamp()) if (min > e.getTimestamp()) {
min = e.getTimestamp(); min = e.getTimestamp();
startTime = min; }
// selected start time as minimum timestamp of provided events
// write start time to db and cache
writeBatch.put(b, writeReverseOrderedLong(startTime));
startTimeCache.put(entity, startTime);
} else {
// found the start time in the db
startTime = readReverseOrderedLong(v, 0);
if (writeBatch != null) {
// if this is a put, re-add the start time to the cache
startTimeCache.put(entity, startTime);
} }
startTime = min;
} }
return checkStartTimeInDb(entity, startTime, writeBatch);
} }
} else { } else {
// start time is provided // start time is provided
// TODO: verify start time in db as well as cache? if (startTimeWriteCache.containsKey(entity)) {
if (startTimeCache.containsKey(entity)) { // check the provided start time matches the cache
// if the start time is already in the cache, if (!startTime.equals(startTimeWriteCache.get(entity))) {
// and it is different from the provided start time, // the start time is already in the cache,
// use the one from the cache // and it is different from the provided start time,
if (!startTime.equals(startTimeCache.get(entity))) // so use the one from the cache
startTime = startTimeCache.get(entity); startTime = startTimeWriteCache.get(entity);
} else if (writeBatch != null) { }
// if this is a put, write the provided start time to the db and the return writeReverseOrderedLong(startTime);
// cache } else {
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); // check the provided start time matches the db
writeBatch.put(b, writeReverseOrderedLong(startTime)); return checkStartTimeInDb(entity, startTime, writeBatch);
startTimeCache.put(entity, startTime);
} }
} }
return writeReverseOrderedLong(startTime); }
/**
* Checks db for start time and returns it if it exists. If it doesn't
* exist, writes the suggested start time (if it is not null). This is
* only called when the start time is not found in the cache,
* so it adds it back into the cache if it is found.
*/
private byte[] checkStartTimeInDb(EntityIdentifier entity,
Long suggestedStartTime, WriteBatch writeBatch) throws IOException {
// create lookup key for start time
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
// retrieve value for key
byte[] v = db.get(b);
byte[] revStartTime;
if (v == null) {
// start time doesn't exist in db
if (suggestedStartTime == null) {
return null;
}
// write suggested start time
revStartTime = writeReverseOrderedLong(suggestedStartTime);
writeBatch.put(b, revStartTime);
} else {
// found start time in db, so ignore suggested start time
suggestedStartTime = readReverseOrderedLong(v, 0);
revStartTime = v;
}
startTimeWriteCache.put(entity, suggestedStartTime);
startTimeReadCache.put(entity, suggestedStartTime);
return revStartTime;
} }
/** /**
@ -868,6 +974,21 @@ public class LeveldbTimelineStore extends AbstractService
*/ */
@VisibleForTesting @VisibleForTesting
void clearStartTimeCache() { void clearStartTimeCache() {
startTimeCache.clear(); startTimeWriteCache.clear();
startTimeReadCache.clear();
}
@VisibleForTesting
static int getStartTimeReadCacheSize(Configuration conf) {
return conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
DEFAULT_START_TIME_READ_CACHE_SIZE);
}
@VisibleForTesting
static int getStartTimeWriteCacheSize(Configuration conf) {
return conf.getInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
DEFAULT_START_TIME_WRITE_CACHE_SIZE);
} }
} }

View File

@ -30,6 +30,8 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TestLeveldbTimelineStore public class TestLeveldbTimelineStore
@ -64,6 +66,7 @@ public class TestLeveldbTimelineStore
super.testGetSingleEntity(); super.testGetSingleEntity();
((LeveldbTimelineStore)store).clearStartTimeCache(); ((LeveldbTimelineStore)store).clearStartTimeCache();
super.testGetSingleEntity(); super.testGetSingleEntity();
loadTestData();
} }
@Test @Test
@ -86,4 +89,20 @@ public class TestLeveldbTimelineStore
super.testGetEvents(); super.testGetEvents();
} }
@Test
public void testCacheSizes() {
Configuration conf = new Configuration();
assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
10001);
assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
conf = new Configuration();
conf.setInt(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
10002);
assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
}
} }