YARN-5340. Fixed a race condition in RollingLevelDBTimelineStore that caused loss of Timeline events. Contributed by Li Lu.

(cherry picked from commit 1c9d2ab503)
This commit is contained in:
Vinod Kumar Vavilapalli 2016-07-20 08:36:36 -07:00
parent c177823ebe
commit 8c5101ea6d
1 changed files with 20 additions and 19 deletions

View File

@ -1243,8 +1243,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
* Get the unique start time for a given entity as a byte array that sorts the * Get the unique start time for a given entity as a byte array that sorts the
* timestamps in reverse order (see * timestamps in reverse order (see
* {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
* time doesn't exist, set it based on the information provided. Should only * time doesn't exist, set it based on the information provided.
* be called when a lock has been obtained on the entity.
* *
* @param entityId * @param entityId
* The id of the entity * The id of the entity
@ -1257,8 +1256,9 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
* @return A StartAndInsertTime * @return A StartAndInsertTime
* @throws IOException * @throws IOException
*/ */
private Long getAndSetStartTime(String entityId, String entityType, private Long getAndSetStartTime(String entityId,
Long startTime, List<TimelineEvent> events) throws IOException { String entityType, Long startTime, List<TimelineEvent> events)
throws IOException {
EntityIdentifier entity = new EntityIdentifier(entityId, entityType); EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
Long time = startTimeWriteCache.get(entity); Long time = startTimeWriteCache.get(entity);
if (time != null) { if (time != null) {
@ -1282,28 +1282,29 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
* Checks db for start time and returns it if it exists. If it doesn't exist, * Checks db for start time and returns it if it exists. If it doesn't exist,
* writes the suggested start time (if it is not null). This is only called * writes the suggested start time (if it is not null). This is only called
* when the start time is not found in the cache, so it adds it back into the * when the start time is not found in the cache, so it adds it back into the
* cache if it is found. Should only be called when a lock has been obtained * cache if it is found.
* on the entity.
*/ */
private Long checkStartTimeInDb(EntityIdentifier entity, private Long checkStartTimeInDb(EntityIdentifier entity,
Long suggestedStartTime) throws IOException { Long suggestedStartTime) throws IOException {
Long startAndInsertTime = null; Long startAndInsertTime = null;
// create lookup key for start time // create lookup key for start time
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
// retrieve value for key synchronized (this) {
byte[] v = starttimedb.get(b); // retrieve value for key
if (v == null) { byte[] v = starttimedb.get(b);
// start time doesn't exist in db if (v == null) {
if (suggestedStartTime == null) { // start time doesn't exist in db
return null; if (suggestedStartTime == null) {
} return null;
startAndInsertTime = suggestedStartTime; }
startAndInsertTime = suggestedStartTime;
// write suggested start time // write suggested start time
starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime)); starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
} else { } else {
// found start time in db, so ignore suggested start time // found start time in db, so ignore suggested start time
startAndInsertTime = readReverseOrderedLong(v, 0); startAndInsertTime = readReverseOrderedLong(v, 0);
}
} }
startTimeWriteCache.put(entity, startAndInsertTime); startTimeWriteCache.put(entity, startAndInsertTime);
startTimeReadCache.put(entity, startAndInsertTime); startTimeReadCache.put(entity, startAndInsertTime);