YARN-5340. Fixed a race condition in RollingLevelDBTimelineStore that caused loss of Timeline events. Contributed by Li Lu.
This commit is contained in:
parent
37362c2f92
commit
1c9d2ab503
|
@ -1243,8 +1243,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
* Get the unique start time for a given entity as a byte array that sorts the
|
* Get the unique start time for a given entity as a byte array that sorts the
|
||||||
* timestamps in reverse order (see
|
* timestamps in reverse order (see
|
||||||
* {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
|
* {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
|
||||||
* time doesn't exist, set it based on the information provided. Should only
|
* time doesn't exist, set it based on the information provided.
|
||||||
* be called when a lock has been obtained on the entity.
|
|
||||||
*
|
*
|
||||||
* @param entityId
|
* @param entityId
|
||||||
* The id of the entity
|
* The id of the entity
|
||||||
|
@ -1257,8 +1256,9 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
* @return A StartAndInsertTime
|
* @return A StartAndInsertTime
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private Long getAndSetStartTime(String entityId, String entityType,
|
private Long getAndSetStartTime(String entityId,
|
||||||
Long startTime, List<TimelineEvent> events) throws IOException {
|
String entityType, Long startTime, List<TimelineEvent> events)
|
||||||
|
throws IOException {
|
||||||
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
|
EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
|
||||||
Long time = startTimeWriteCache.get(entity);
|
Long time = startTimeWriteCache.get(entity);
|
||||||
if (time != null) {
|
if (time != null) {
|
||||||
|
@ -1282,28 +1282,29 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
* Checks db for start time and returns it if it exists. If it doesn't exist,
|
* Checks db for start time and returns it if it exists. If it doesn't exist,
|
||||||
* writes the suggested start time (if it is not null). This is only called
|
* writes the suggested start time (if it is not null). This is only called
|
||||||
* when the start time is not found in the cache, so it adds it back into the
|
* when the start time is not found in the cache, so it adds it back into the
|
||||||
* cache if it is found. Should only be called when a lock has been obtained
|
* cache if it is found.
|
||||||
* on the entity.
|
|
||||||
*/
|
*/
|
||||||
private Long checkStartTimeInDb(EntityIdentifier entity,
|
private Long checkStartTimeInDb(EntityIdentifier entity,
|
||||||
Long suggestedStartTime) throws IOException {
|
Long suggestedStartTime) throws IOException {
|
||||||
Long startAndInsertTime = null;
|
Long startAndInsertTime = null;
|
||||||
// create lookup key for start time
|
// create lookup key for start time
|
||||||
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
|
byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
|
||||||
// retrieve value for key
|
synchronized (this) {
|
||||||
byte[] v = starttimedb.get(b);
|
// retrieve value for key
|
||||||
if (v == null) {
|
byte[] v = starttimedb.get(b);
|
||||||
// start time doesn't exist in db
|
if (v == null) {
|
||||||
if (suggestedStartTime == null) {
|
// start time doesn't exist in db
|
||||||
return null;
|
if (suggestedStartTime == null) {
|
||||||
}
|
return null;
|
||||||
startAndInsertTime = suggestedStartTime;
|
}
|
||||||
|
startAndInsertTime = suggestedStartTime;
|
||||||
|
|
||||||
// write suggested start time
|
// write suggested start time
|
||||||
starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
|
starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
|
||||||
} else {
|
} else {
|
||||||
// found start time in db, so ignore suggested start time
|
// found start time in db, so ignore suggested start time
|
||||||
startAndInsertTime = readReverseOrderedLong(v, 0);
|
startAndInsertTime = readReverseOrderedLong(v, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
startTimeWriteCache.put(entity, startAndInsertTime);
|
startTimeWriteCache.put(entity, startAndInsertTime);
|
||||||
startTimeReadCache.put(entity, startAndInsertTime);
|
startTimeReadCache.put(entity, startAndInsertTime);
|
||||||
|
|
Loading…
Reference in New Issue