From b4fa1830a8e7d642d6aecc6d0a5c0460b51fc822 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Mon, 17 Dec 2018 12:04:25 -0500 Subject: [PATCH] YARN-9040. Fixed memory leak in LevelDBCacheTimelineStore and DBIterator. Contributed by Tarun Parimi (cherry picked from commit 71e0b0d8005ea1952dc7e582db15c2ac09df7c91) --- .../timeline/KeyValueBasedTimelineStore.java | 98 ++++++++++--------- .../server/timeline/MemoryTimelineStore.java | 37 ++++++- .../timeline/TimelineStoreMapAdapter.java | 9 +- .../timeline/LevelDBCacheTimelineStore.java | 14 ++- 4 files changed, 99 insertions(+), 59 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java index 82db770191b..c24d904a2ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/KeyValueBasedTimelineStore.java @@ -42,7 +42,6 @@ import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -50,6 +49,7 @@ import java.util.Set; import java.util.SortedSet; import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; +import static org.apache.hadoop.yarn.server.timeline.TimelineStoreMapAdapter.CloseableIterator; /** * Map based implementation of {@link TimelineStore}. A hash map @@ -114,66 +114,67 @@ abstract class KeyValueBasedTimelineStore fields = EnumSet.allOf(Field.class); } - Iterator entityIterator = null; + TimelineEntity firstEntity = null; if (fromId != null) { - TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId, + firstEntity = entities.get(new EntityIdentifier(fromId, entityType)); if (firstEntity == null) { return new TimelineEntities(); - } else { - entityIterator = entities.valueSetIterator(firstEntity); } } - if (entityIterator == null) { - entityIterator = entities.valueSetIterator(); - } List entitiesSelected = new ArrayList(); - while (entityIterator.hasNext()) { - TimelineEntity entity = entityIterator.next(); - if (entitiesSelected.size() >= limit) { - break; - } - if (!entity.getEntityType().equals(entityType)) { - continue; - } - if (entity.getStartTime() <= windowStart) { - continue; - } - if (entity.getStartTime() > windowEnd) { - continue; - } - if (fromTs != null && entityInsertTimes.get(new EntityIdentifier( - entity.getEntityId(), entity.getEntityType())) > fromTs) { - continue; - } - if (primaryFilter != null && - !KeyValueBasedTimelineStoreUtils.matchPrimaryFilter( - entity.getPrimaryFilters(), primaryFilter)) { - continue; - } - if (secondaryFilters != null) { // AND logic - boolean flag = true; - for (NameValuePair secondaryFilter : secondaryFilters) { - if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils - .matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter) - && !KeyValueBasedTimelineStoreUtils.matchFilter( - entity.getOtherInfo(), secondaryFilter)) { - flag = false; - break; - } + + try(CloseableIterator entityIterator = + firstEntity == null ? entities.valueSetIterator() : + entities.valueSetIterator(firstEntity)) { + while (entityIterator.hasNext()) { + TimelineEntity entity = entityIterator.next(); + if (entitiesSelected.size() >= limit) { + break; } - if (!flag) { + if (!entity.getEntityType().equals(entityType)) { continue; } - } - if (entity.getDomainId() == null) { - entity.setDomainId(DEFAULT_DOMAIN_ID); - } - if (checkAcl == null || checkAcl.check(entity)) { - entitiesSelected.add(entity); + if (entity.getStartTime() <= windowStart) { + continue; + } + if (entity.getStartTime() > windowEnd) { + continue; + } + if (fromTs != null && entityInsertTimes.get( + new EntityIdentifier(entity.getEntityId(), entity.getEntityType())) + > fromTs) { + continue; + } + if (primaryFilter != null && !KeyValueBasedTimelineStoreUtils + .matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) { + continue; + } + if (secondaryFilters != null) { // AND logic + boolean flag = true; + for (NameValuePair secondaryFilter : secondaryFilters) { + if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils + .matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter) + && !KeyValueBasedTimelineStoreUtils + .matchFilter(entity.getOtherInfo(), secondaryFilter)) { + flag = false; + break; + } + } + if (!flag) { + continue; + } + } + if (entity.getDomainId() == null) { + entity.setDomainId(DEFAULT_DOMAIN_ID); + } + if (checkAcl == null || checkAcl.check(entity)) { + entitiesSelected.add(entity); + } } } + List entitiesToReturn = new ArrayList(); for (TimelineEntity entitySelected : entitiesSelected) { entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields( @@ -569,6 +570,7 @@ abstract class KeyValueBasedTimelineStore } return o; } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java index 5c2db0004e0..f4aea45b350 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timeline; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -59,14 +60,15 @@ public class MemoryTimelineStore extends KeyValueBasedTimelineStore { } @Override - public Iterator + public CloseableIterator valueSetIterator() { - return new TreeSet<>(internalMap.values()).iterator(); + return wrapClosableIterator(new TreeSet<>(internalMap.values()) + .iterator()); } @Override @SuppressWarnings("unchecked") - public Iterator valueSetIterator(V minV) { + public CloseableIterator valueSetIterator(V minV) { if (minV instanceof Comparable) { TreeSet tempTreeSet = new TreeSet<>(); for (V value : internalMap.values()) { @@ -74,11 +76,38 @@ public class MemoryTimelineStore extends KeyValueBasedTimelineStore { tempTreeSet.add(value); } } - return tempTreeSet.iterator(); + return wrapClosableIterator(tempTreeSet.iterator()); } else { return valueSetIterator(); } } + + private CloseableIterator wrapClosableIterator( + final Iterator iterator) { + return new CloseableIterator() { + private final Iterator internalIterator = iterator; + @Override + public void close() throws IOException { + // Not implemented + } + + @Override + public boolean hasNext() { + return internalIterator.hasNext(); + } + + @Override + public V next() { + return internalIterator.next(); + } + + @Override + public void remove() { + internalIterator.remove(); + } + }; + + } } public MemoryTimelineStore() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java index 175ed0b4457..38fc28efeab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreMapAdapter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timeline; +import java.io.Closeable; import java.util.Iterator; /** @@ -48,7 +49,7 @@ interface TimelineStoreMapAdapter { /** * @return the iterator of the value set of the map */ - Iterator valueSetIterator(); + CloseableIterator valueSetIterator(); /** * Return the iterator of the value set of the map, starting from minV if type @@ -56,5 +57,9 @@ interface TimelineStoreMapAdapter { * @param minV * @return */ - Iterator valueSetIterator(V minV); + CloseableIterator valueSetIterator(V minV); + + interface CloseableIterator extends Iterator, Closeable {} } + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java index f7a3d01b731..9b1ffdcce41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LevelDBCacheTimelineStore.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Iterator; import java.util.Map; /** @@ -211,18 +210,18 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore { } @Override - public Iterator valueSetIterator() { + public CloseableIterator valueSetIterator() { return getIterator(null, Long.MAX_VALUE); } @Override - public Iterator valueSetIterator(V minV) { + public CloseableIterator valueSetIterator(V minV) { return getIterator( new EntityIdentifier(minV.getEntityId(), minV.getEntityType()), minV.getStartTime()); } - private Iterator getIterator( + private CloseableIterator getIterator( EntityIdentifier startId, long startTimeMax) { final DBIterator internalDbIterator = entityDb.iterator(); @@ -247,7 +246,7 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore { = entityPrefixKeyBuilder.getBytesForLookup(); internalDbIterator.seek(startPrefixBytes); - return new Iterator() { + return new CloseableIterator() { @Override public boolean hasNext() { if (!internalDbIterator.hasNext()) { @@ -284,6 +283,11 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore { LOG.error("LevelDB map adapter does not support iterate-and-remove" + " use cases. "); } + + @Override + public void close() throws IOException { + internalDbIterator.close(); + } }; } static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();