From 5e751f33678558b02b3d45f07713fa5027d3010c Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Wed, 29 Mar 2017 01:53:20 +0530 Subject: [PATCH] YARN-5368. Memory leak in timeline server (Jonathan Eagles via Varun Saxena) (cherry picked from commit 01aca54a22c8586d232a8f79fe9977aeb8d09b83) --- .../timeline/RollingLevelDBTimelineStore.java | 263 ++++++++---------- 1 file changed, 117 insertions(+), 146 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java index 4d38008c993..20e0379ac62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java @@ -275,9 +275,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements Path domainDBPath = new Path(dbPath, DOMAIN); Path starttimeDBPath = new Path(dbPath, STARTTIME); Path ownerDBPath = new Path(dbPath, OWNER); - FileSystem localFS = null; - try { - localFS = FileSystem.getLocal(conf); + try (FileSystem localFS = FileSystem.getLocal(conf)) { if (!localFS.exists(dbPath)) { if (!localFS.mkdirs(dbPath)) { throw new IOException("Couldn't create directory for leveldb " @@ -306,8 +304,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements } localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK); } - } finally { - IOUtils.cleanup(LOG, localFS); } options.maxOpenFiles(conf.getInt( TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, @@ -408,19 +404,15 @@ public class RollingLevelDBTimelineStore extends AbstractService implements .add(writeReverseOrderedLong(revStartTime)).add(entityId) .getBytesForLookup(); - DBIterator iterator = null; - try { - DB db = entitydb.getDBForStartTime(revStartTime); - if (db == null) { - return null; - } - iterator = db.iterator(); + DB db = entitydb.getDBForStartTime(revStartTime); + if (db == null) { + return null; + } + try (DBIterator iterator = db.iterator()) { iterator.seek(prefix); return getEntity(entityId, entityType, revStartTime, fields, iterator, prefix, prefix.length); - } finally { - IOUtils.cleanup(LOG, iterator); } } @@ -533,62 +525,61 @@ public class RollingLevelDBTimelineStore extends AbstractService implements o2.length); } }); - DBIterator iterator = null; - try { + // look up start times for the specified entities // skip entities with no start time - for (String entityId : entityIds) { - byte[] startTime = getStartTime(entityId, entityType); - if (startTime != null) { - List entities = startTimeMap.get(startTime); - if (entities == null) { - entities = new ArrayList(); - startTimeMap.put(startTime, entities); - } - entities.add(new EntityIdentifier(entityId, entityType)); + for (String entityId : entityIds) { + byte[] startTime = getStartTime(entityId, entityType); + if (startTime != null) { + List entities = startTimeMap.get(startTime); + if (entities == null) { + entities = new ArrayList(); + startTimeMap.put(startTime, entities); } + entities.add(new EntityIdentifier(entityId, entityType)); } - for (Entry> entry : startTimeMap + } + for (Entry> entry : startTimeMap .entrySet()) { - // look up the events matching the given parameters (limit, - // start time, end time, event types) for entities whose start times - // were found and add the entities to the return list - byte[] revStartTime = entry.getKey(); - for (EntityIdentifier entityIdentifier : entry.getValue()) { - EventsOfOneEntity entity = new EventsOfOneEntity(); - entity.setEntityId(entityIdentifier.getId()); - entity.setEntityType(entityType); - events.addEvent(entity); - KeyBuilder kb = KeyBuilder.newInstance().add(entityType) - .add(revStartTime).add(entityIdentifier.getId()) - .add(EVENTS_COLUMN); - byte[] prefix = kb.getBytesForLookup(); - if (windowEnd == null) { - windowEnd = Long.MAX_VALUE; - } - byte[] revts = writeReverseOrderedLong(windowEnd); - kb.add(revts); - byte[] first = kb.getBytesForLookup(); - byte[] last = null; - if (windowStart != null) { - last = KeyBuilder.newInstance().add(prefix) - .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); - } - if (limit == null) { - limit = DEFAULT_LIMIT; - } - DB db = entitydb.getDBForStartTime(readReverseOrderedLong( - revStartTime, 0)); - if (db == null) { - continue; - } - iterator = db.iterator(); + // look up the events matching the given parameters (limit, + // start time, end time, event types) for entities whose start times + // were found and add the entities to the return list + byte[] revStartTime = entry.getKey(); + for (EntityIdentifier entityIdentifier : entry.getValue()) { + EventsOfOneEntity entity = new EventsOfOneEntity(); + entity.setEntityId(entityIdentifier.getId()); + entity.setEntityType(entityType); + events.addEvent(entity); + KeyBuilder kb = KeyBuilder.newInstance().add(entityType) + .add(revStartTime).add(entityIdentifier.getId()) + .add(EVENTS_COLUMN); + byte[] prefix = kb.getBytesForLookup(); + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + byte[] revts = writeReverseOrderedLong(windowEnd); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (windowStart != null) { + last = KeyBuilder.newInstance().add(prefix) + .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + DB db = entitydb.getDBForStartTime(readReverseOrderedLong( + revStartTime, 0)); + if (db == null) { + continue; + } + try (DBIterator iterator = db.iterator()) { for (iterator.seek(first); entity.getEvents().size() < limit && iterator.hasNext(); iterator.next()) { byte[] key = iterator.peekNext().getKey(); if (!prefixMatches(prefix, prefix.length, key) || (last != null && WritableComparator.compareBytes(key, 0, - key.length, last, 0, last.length) > 0)) { + key.length, last, 0, last.length) > 0)) { break; } TimelineEvent event = getEntityEvent(eventType, key, prefix.length, @@ -599,8 +590,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements } } } - } finally { - IOUtils.cleanup(LOG, iterator); } return events; } @@ -657,66 +646,64 @@ public class RollingLevelDBTimelineStore extends AbstractService implements Long limit, Long starttime, Long endtime, String fromId, Long fromTs, Collection secondaryFilters, EnumSet fields, CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException { - DBIterator iterator = null; - try { - KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); - // only db keys matching the prefix (base + entity type) will be parsed - byte[] prefix = kb.getBytesForLookup(); - if (endtime == null) { - // if end time is null, place no restriction on end time - endtime = Long.MAX_VALUE; - } + KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); + // only db keys matching the prefix (base + entity type) will be parsed + byte[] prefix = kb.getBytesForLookup(); + if (endtime == null) { + // if end time is null, place no restriction on end time + endtime = Long.MAX_VALUE; + } - // Sanitize the fields parameter - if (fields == null) { - fields = EnumSet.allOf(Field.class); - } + // Sanitize the fields parameter + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } - // construct a first key that will be seeked to using end time or fromId - long firstStartTime = Long.MAX_VALUE; - byte[] first = null; - if (fromId != null) { - Long fromIdStartTime = getStartTimeLong(fromId, entityType); - if (fromIdStartTime == null) { - // no start time for provided id, so return empty entities - return new TimelineEntities(); - } - if (fromIdStartTime <= endtime) { - // if provided id's start time falls before the end of the window, - // use it to construct the seek key - firstStartTime = fromIdStartTime; - first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId) - .getBytesForLookup(); - } + // construct a first key that will be seeked to using end time or fromId + long firstStartTime = Long.MAX_VALUE; + byte[] first = null; + if (fromId != null) { + Long fromIdStartTime = getStartTimeLong(fromId, entityType); + if (fromIdStartTime == null) { + // no start time for provided id, so return empty entities + return new TimelineEntities(); } - // if seek key wasn't constructed using fromId, construct it using end ts - if (first == null) { - firstStartTime = endtime; - first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); - } - byte[] last = null; - if (starttime != null) { - // if start time is not null, set a last key that will not be - // iterated past - last = KeyBuilder.newInstance().add(base).add(entityType) - .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); - } - if (limit == null) { - // if limit is not specified, use the default - limit = DEFAULT_LIMIT; + if (fromIdStartTime <= endtime) { + // if provided id's start time falls before the end of the window, + // use it to construct the seek key + firstStartTime = fromIdStartTime; + first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId) + .getBytesForLookup(); } + } + // if seek key wasn't constructed using fromId, construct it using end ts + if (first == null) { + firstStartTime = endtime; + first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); + } + byte[] last = null; + if (starttime != null) { + // if start time is not null, set a last key that will not be + // iterated past + last = KeyBuilder.newInstance().add(base).add(entityType) + .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); + } + if (limit == null) { + // if limit is not specified, use the default + limit = DEFAULT_LIMIT; + } - TimelineEntities entities = new TimelineEntities(); - RollingLevelDB rollingdb = null; - if (usingPrimaryFilter) { - rollingdb = indexdb; - } else { - rollingdb = entitydb; - } + TimelineEntities entities = new TimelineEntities(); + RollingLevelDB rollingdb = null; + if (usingPrimaryFilter) { + rollingdb = indexdb; + } else { + rollingdb = entitydb; + } - DB db = rollingdb.getDBForStartTime(firstStartTime); - while (entities.getEntities().size() < limit && db != null) { - iterator = db.iterator(); + DB db = rollingdb.getDBForStartTime(firstStartTime); + while (entities.getEntities().size() < limit && db != null) { + try (DBIterator iterator = db.iterator()) { iterator.seek(first); // iterate until one of the following conditions is met: limit is @@ -726,7 +713,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements byte[] key = iterator.peekNext().getKey(); if (!prefixMatches(prefix, prefix.length, key) || (last != null && WritableComparator.compareBytes(key, 0, - key.length, last, 0, last.length) > 0)) { + key.length, last, 0, last.length) > 0)) { break; } // read the start time and entity id from the current key @@ -814,10 +801,8 @@ public class RollingLevelDBTimelineStore extends AbstractService implements } db = rollingdb.getPreviousDB(db); } - return entities; - } finally { - IOUtils.cleanup(LOG, iterator); } + return entities; } /** @@ -1459,15 +1444,14 @@ public class RollingLevelDBTimelineStore extends AbstractService implements long startTimesCount = 0; WriteBatch writeBatch = null; - DBIterator iterator = null; - try { - writeBatch = starttimedb.createWriteBatch(); - ReadOptions readOptions = new ReadOptions(); - readOptions.fillCache(false); - iterator = starttimedb.iterator(readOptions); + ReadOptions readOptions = new ReadOptions(); + readOptions.fillCache(false); + try (DBIterator iterator = starttimedb.iterator(readOptions)) { + // seek to the first start time entry iterator.seekToFirst(); + writeBatch = starttimedb.createWriteBatch(); // evaluate each start time entry to see if it needs to be evicted or not while (iterator.hasNext()) { @@ -1513,7 +1497,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements + " start time entities earlier than " + minStartTime); } finally { IOUtils.cleanup(LOG, writeBatch); - IOUtils.cleanup(LOG, iterator); } return startTimesCount; } @@ -1598,11 +1581,9 @@ public class RollingLevelDBTimelineStore extends AbstractService implements // TODO: make data retention work with the domain data as well @Override public void put(TimelineDomain domain) throws IOException { - WriteBatch domainWriteBatch = null; - WriteBatch ownerWriteBatch = null; - try { - domainWriteBatch = domaindb.createWriteBatch(); - ownerWriteBatch = ownerdb.createWriteBatch(); + try (WriteBatch domainWriteBatch = domaindb.createWriteBatch(); + WriteBatch ownerWriteBatch = ownerdb.createWriteBatch();) { + if (domain.getId() == null || domain.getId().length() == 0) { throw new IllegalArgumentException("Domain doesn't have an ID"); } @@ -1682,9 +1663,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements ownerWriteBatch.put(ownerLookupEntryKey, timestamps); domaindb.write(domainWriteBatch); ownerdb.write(ownerWriteBatch); - } finally { - IOUtils.cleanup(LOG, domainWriteBatch); - IOUtils.cleanup(LOG, ownerWriteBatch); } } @@ -1709,26 +1687,21 @@ public class RollingLevelDBTimelineStore extends AbstractService implements @Override public TimelineDomain getDomain(String domainId) throws IOException { - DBIterator iterator = null; - try { + try (DBIterator iterator = domaindb.iterator()) { byte[] prefix = KeyBuilder.newInstance().add(domainId) .getBytesForLookup(); - iterator = domaindb.iterator(); iterator.seek(prefix); return getTimelineDomain(iterator, domainId, prefix); - } finally { - IOUtils.cleanup(LOG, iterator); } } @Override public TimelineDomains getDomains(String owner) throws IOException { - DBIterator iterator = null; - try { + try (DBIterator iterator = ownerdb.iterator()) { byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup(); + iterator.seek(prefix); List domains = new ArrayList(); - for (iterator = ownerdb.iterator(), iterator.seek(prefix); iterator - .hasNext();) { + while (iterator.hasNext()) { byte[] key = iterator.peekNext().getKey(); if (!prefixMatches(prefix, prefix.length, key)) { break; @@ -1761,8 +1734,6 @@ public class RollingLevelDBTimelineStore extends AbstractService implements TimelineDomains domainsToReturn = new TimelineDomains(); domainsToReturn.addDomains(domains); return domainsToReturn; - } finally { - IOUtils.cleanup(LOG, iterator); } }