YARN-5368. Memory leak in timeline server (Jonathan Eagles via Varun Saxena)

This commit is contained in:
Varun Saxena 2017-03-29 01:53:20 +05:30
parent 6b09336438
commit 01aca54a22
1 changed files with 117 additions and 146 deletions

View File

@ -275,9 +275,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
Path domainDBPath = new Path(dbPath, DOMAIN); Path domainDBPath = new Path(dbPath, DOMAIN);
Path starttimeDBPath = new Path(dbPath, STARTTIME); Path starttimeDBPath = new Path(dbPath, STARTTIME);
Path ownerDBPath = new Path(dbPath, OWNER); Path ownerDBPath = new Path(dbPath, OWNER);
FileSystem localFS = null; try (FileSystem localFS = FileSystem.getLocal(conf)) {
try {
localFS = FileSystem.getLocal(conf);
if (!localFS.exists(dbPath)) { if (!localFS.exists(dbPath)) {
if (!localFS.mkdirs(dbPath)) { if (!localFS.mkdirs(dbPath)) {
throw new IOException("Couldn't create directory for leveldb " throw new IOException("Couldn't create directory for leveldb "
@ -306,8 +304,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
} }
localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK); localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
} }
} finally {
IOUtils.cleanup(LOG, localFS);
} }
options.maxOpenFiles(conf.getInt( options.maxOpenFiles(conf.getInt(
TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
@ -408,19 +404,15 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
.add(writeReverseOrderedLong(revStartTime)).add(entityId) .add(writeReverseOrderedLong(revStartTime)).add(entityId)
.getBytesForLookup(); .getBytesForLookup();
DBIterator iterator = null; DB db = entitydb.getDBForStartTime(revStartTime);
try { if (db == null) {
DB db = entitydb.getDBForStartTime(revStartTime); return null;
if (db == null) { }
return null; try (DBIterator iterator = db.iterator()) {
}
iterator = db.iterator();
iterator.seek(prefix); iterator.seek(prefix);
return getEntity(entityId, entityType, revStartTime, fields, iterator, return getEntity(entityId, entityType, revStartTime, fields, iterator,
prefix, prefix.length); prefix, prefix.length);
} finally {
IOUtils.cleanup(LOG, iterator);
} }
} }
@ -533,62 +525,61 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
o2.length); o2.length);
} }
}); });
DBIterator iterator = null;
try {
// look up start times for the specified entities // look up start times for the specified entities
// skip entities with no start time // skip entities with no start time
for (String entityId : entityIds) { for (String entityId : entityIds) {
byte[] startTime = getStartTime(entityId, entityType); byte[] startTime = getStartTime(entityId, entityType);
if (startTime != null) { if (startTime != null) {
List<EntityIdentifier> entities = startTimeMap.get(startTime); List<EntityIdentifier> entities = startTimeMap.get(startTime);
if (entities == null) { if (entities == null) {
entities = new ArrayList<EntityIdentifier>(); entities = new ArrayList<EntityIdentifier>();
startTimeMap.put(startTime, entities); startTimeMap.put(startTime, entities);
}
entities.add(new EntityIdentifier(entityId, entityType));
} }
entities.add(new EntityIdentifier(entityId, entityType));
} }
for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap }
for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
.entrySet()) { .entrySet()) {
// look up the events matching the given parameters (limit, // look up the events matching the given parameters (limit,
// start time, end time, event types) for entities whose start times // start time, end time, event types) for entities whose start times
// were found and add the entities to the return list // were found and add the entities to the return list
byte[] revStartTime = entry.getKey(); byte[] revStartTime = entry.getKey();
for (EntityIdentifier entityIdentifier : entry.getValue()) { for (EntityIdentifier entityIdentifier : entry.getValue()) {
EventsOfOneEntity entity = new EventsOfOneEntity(); EventsOfOneEntity entity = new EventsOfOneEntity();
entity.setEntityId(entityIdentifier.getId()); entity.setEntityId(entityIdentifier.getId());
entity.setEntityType(entityType); entity.setEntityType(entityType);
events.addEvent(entity); events.addEvent(entity);
KeyBuilder kb = KeyBuilder.newInstance().add(entityType) KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
.add(revStartTime).add(entityIdentifier.getId()) .add(revStartTime).add(entityIdentifier.getId())
.add(EVENTS_COLUMN); .add(EVENTS_COLUMN);
byte[] prefix = kb.getBytesForLookup(); byte[] prefix = kb.getBytesForLookup();
if (windowEnd == null) { if (windowEnd == null) {
windowEnd = Long.MAX_VALUE; windowEnd = Long.MAX_VALUE;
} }
byte[] revts = writeReverseOrderedLong(windowEnd); byte[] revts = writeReverseOrderedLong(windowEnd);
kb.add(revts); kb.add(revts);
byte[] first = kb.getBytesForLookup(); byte[] first = kb.getBytesForLookup();
byte[] last = null; byte[] last = null;
if (windowStart != null) { if (windowStart != null) {
last = KeyBuilder.newInstance().add(prefix) last = KeyBuilder.newInstance().add(prefix)
.add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
} }
if (limit == null) { if (limit == null) {
limit = DEFAULT_LIMIT; limit = DEFAULT_LIMIT;
} }
DB db = entitydb.getDBForStartTime(readReverseOrderedLong( DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
revStartTime, 0)); revStartTime, 0));
if (db == null) { if (db == null) {
continue; continue;
} }
iterator = db.iterator(); try (DBIterator iterator = db.iterator()) {
for (iterator.seek(first); entity.getEvents().size() < limit for (iterator.seek(first); entity.getEvents().size() < limit
&& iterator.hasNext(); iterator.next()) { && iterator.hasNext(); iterator.next()) {
byte[] key = iterator.peekNext().getKey(); byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) if (!prefixMatches(prefix, prefix.length, key)
|| (last != null && WritableComparator.compareBytes(key, 0, || (last != null && WritableComparator.compareBytes(key, 0,
key.length, last, 0, last.length) > 0)) { key.length, last, 0, last.length) > 0)) {
break; break;
} }
TimelineEvent event = getEntityEvent(eventType, key, prefix.length, TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
@ -599,8 +590,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
} }
} }
} }
} finally {
IOUtils.cleanup(LOG, iterator);
} }
return events; return events;
} }
@ -657,66 +646,64 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
Long limit, Long starttime, Long endtime, String fromId, Long fromTs, Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields, Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException { CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
DBIterator iterator = null; KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
try { // only db keys matching the prefix (base + entity type) will be parsed
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); byte[] prefix = kb.getBytesForLookup();
// only db keys matching the prefix (base + entity type) will be parsed if (endtime == null) {
byte[] prefix = kb.getBytesForLookup(); // if end time is null, place no restriction on end time
if (endtime == null) { endtime = Long.MAX_VALUE;
// if end time is null, place no restriction on end time }
endtime = Long.MAX_VALUE;
}
// Sanitize the fields parameter // Sanitize the fields parameter
if (fields == null) { if (fields == null) {
fields = EnumSet.allOf(Field.class); fields = EnumSet.allOf(Field.class);
} }
// construct a first key that will be seeked to using end time or fromId // construct a first key that will be seeked to using end time or fromId
long firstStartTime = Long.MAX_VALUE; long firstStartTime = Long.MAX_VALUE;
byte[] first = null; byte[] first = null;
if (fromId != null) { if (fromId != null) {
Long fromIdStartTime = getStartTimeLong(fromId, entityType); Long fromIdStartTime = getStartTimeLong(fromId, entityType);
if (fromIdStartTime == null) { if (fromIdStartTime == null) {
// no start time for provided id, so return empty entities // no start time for provided id, so return empty entities
return new TimelineEntities(); return new TimelineEntities();
}
if (fromIdStartTime <= endtime) {
// if provided id's start time falls before the end of the window,
// use it to construct the seek key
firstStartTime = fromIdStartTime;
first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
.getBytesForLookup();
}
} }
// if seek key wasn't constructed using fromId, construct it using end ts if (fromIdStartTime <= endtime) {
if (first == null) { // if provided id's start time falls before the end of the window,
firstStartTime = endtime; // use it to construct the seek key
first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); firstStartTime = fromIdStartTime;
} first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
byte[] last = null; .getBytesForLookup();
if (starttime != null) {
// if start time is not null, set a last key that will not be
// iterated past
last = KeyBuilder.newInstance().add(base).add(entityType)
.add(writeReverseOrderedLong(starttime)).getBytesForLookup();
}
if (limit == null) {
// if limit is not specified, use the default
limit = DEFAULT_LIMIT;
} }
}
// if seek key wasn't constructed using fromId, construct it using end ts
if (first == null) {
firstStartTime = endtime;
first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
}
byte[] last = null;
if (starttime != null) {
// if start time is not null, set a last key that will not be
// iterated past
last = KeyBuilder.newInstance().add(base).add(entityType)
.add(writeReverseOrderedLong(starttime)).getBytesForLookup();
}
if (limit == null) {
// if limit is not specified, use the default
limit = DEFAULT_LIMIT;
}
TimelineEntities entities = new TimelineEntities(); TimelineEntities entities = new TimelineEntities();
RollingLevelDB rollingdb = null; RollingLevelDB rollingdb = null;
if (usingPrimaryFilter) { if (usingPrimaryFilter) {
rollingdb = indexdb; rollingdb = indexdb;
} else { } else {
rollingdb = entitydb; rollingdb = entitydb;
} }
DB db = rollingdb.getDBForStartTime(firstStartTime); DB db = rollingdb.getDBForStartTime(firstStartTime);
while (entities.getEntities().size() < limit && db != null) { while (entities.getEntities().size() < limit && db != null) {
iterator = db.iterator(); try (DBIterator iterator = db.iterator()) {
iterator.seek(first); iterator.seek(first);
// iterate until one of the following conditions is met: limit is // iterate until one of the following conditions is met: limit is
@ -726,7 +713,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
byte[] key = iterator.peekNext().getKey(); byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key) if (!prefixMatches(prefix, prefix.length, key)
|| (last != null && WritableComparator.compareBytes(key, 0, || (last != null && WritableComparator.compareBytes(key, 0,
key.length, last, 0, last.length) > 0)) { key.length, last, 0, last.length) > 0)) {
break; break;
} }
// read the start time and entity id from the current key // read the start time and entity id from the current key
@ -814,10 +801,8 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
} }
db = rollingdb.getPreviousDB(db); db = rollingdb.getPreviousDB(db);
} }
return entities;
} finally {
IOUtils.cleanup(LOG, iterator);
} }
return entities;
} }
/** /**
@ -1459,15 +1444,14 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
long startTimesCount = 0; long startTimesCount = 0;
WriteBatch writeBatch = null; WriteBatch writeBatch = null;
DBIterator iterator = null;
try { ReadOptions readOptions = new ReadOptions();
writeBatch = starttimedb.createWriteBatch(); readOptions.fillCache(false);
ReadOptions readOptions = new ReadOptions(); try (DBIterator iterator = starttimedb.iterator(readOptions)) {
readOptions.fillCache(false);
iterator = starttimedb.iterator(readOptions);
// seek to the first start time entry // seek to the first start time entry
iterator.seekToFirst(); iterator.seekToFirst();
writeBatch = starttimedb.createWriteBatch();
// evaluate each start time entry to see if it needs to be evicted or not // evaluate each start time entry to see if it needs to be evicted or not
while (iterator.hasNext()) { while (iterator.hasNext()) {
@ -1513,7 +1497,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
+ " start time entities earlier than " + minStartTime); + " start time entities earlier than " + minStartTime);
} finally { } finally {
IOUtils.cleanup(LOG, writeBatch); IOUtils.cleanup(LOG, writeBatch);
IOUtils.cleanup(LOG, iterator);
} }
return startTimesCount; return startTimesCount;
} }
@ -1598,11 +1581,9 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
// TODO: make data retention work with the domain data as well // TODO: make data retention work with the domain data as well
@Override @Override
public void put(TimelineDomain domain) throws IOException { public void put(TimelineDomain domain) throws IOException {
WriteBatch domainWriteBatch = null; try (WriteBatch domainWriteBatch = domaindb.createWriteBatch();
WriteBatch ownerWriteBatch = null; WriteBatch ownerWriteBatch = ownerdb.createWriteBatch();) {
try {
domainWriteBatch = domaindb.createWriteBatch();
ownerWriteBatch = ownerdb.createWriteBatch();
if (domain.getId() == null || domain.getId().length() == 0) { if (domain.getId() == null || domain.getId().length() == 0) {
throw new IllegalArgumentException("Domain doesn't have an ID"); throw new IllegalArgumentException("Domain doesn't have an ID");
} }
@ -1682,9 +1663,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
ownerWriteBatch.put(ownerLookupEntryKey, timestamps); ownerWriteBatch.put(ownerLookupEntryKey, timestamps);
domaindb.write(domainWriteBatch); domaindb.write(domainWriteBatch);
ownerdb.write(ownerWriteBatch); ownerdb.write(ownerWriteBatch);
} finally {
IOUtils.cleanup(LOG, domainWriteBatch);
IOUtils.cleanup(LOG, ownerWriteBatch);
} }
} }
@ -1709,26 +1687,21 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
@Override @Override
public TimelineDomain getDomain(String domainId) throws IOException { public TimelineDomain getDomain(String domainId) throws IOException {
DBIterator iterator = null; try (DBIterator iterator = domaindb.iterator()) {
try {
byte[] prefix = KeyBuilder.newInstance().add(domainId) byte[] prefix = KeyBuilder.newInstance().add(domainId)
.getBytesForLookup(); .getBytesForLookup();
iterator = domaindb.iterator();
iterator.seek(prefix); iterator.seek(prefix);
return getTimelineDomain(iterator, domainId, prefix); return getTimelineDomain(iterator, domainId, prefix);
} finally {
IOUtils.cleanup(LOG, iterator);
} }
} }
@Override @Override
public TimelineDomains getDomains(String owner) throws IOException { public TimelineDomains getDomains(String owner) throws IOException {
DBIterator iterator = null; try (DBIterator iterator = ownerdb.iterator()) {
try {
byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup(); byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup();
iterator.seek(prefix);
List<TimelineDomain> domains = new ArrayList<TimelineDomain>(); List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
for (iterator = ownerdb.iterator(), iterator.seek(prefix); iterator while (iterator.hasNext()) {
.hasNext();) {
byte[] key = iterator.peekNext().getKey(); byte[] key = iterator.peekNext().getKey();
if (!prefixMatches(prefix, prefix.length, key)) { if (!prefixMatches(prefix, prefix.length, key)) {
break; break;
@ -1761,8 +1734,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
TimelineDomains domainsToReturn = new TimelineDomains(); TimelineDomains domainsToReturn = new TimelineDomains();
domainsToReturn.addDomains(domains); domainsToReturn.addDomains(domains);
return domainsToReturn; return domainsToReturn;
} finally {
IOUtils.cleanup(LOG, iterator);
} }
} }