YARN-9040. Fixed memory leak in LevelDBCacheTimelineStore and DBIterator.

Contributed by Tarun Parimi

(cherry picked from commit 71e0b0d800)
This commit is contained in:
Eric Yang 2018-12-17 12:04:25 -05:00
parent 0ccfee31a5
commit b4fa1830a8
4 changed files with 99 additions and 59 deletions

View File

@ -42,7 +42,6 @@ import java.util.Comparator;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -50,6 +49,7 @@ import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.TimelineStoreMapAdapter.CloseableIterator;
/** /**
* Map based implementation of {@link TimelineStore}. A hash map * Map based implementation of {@link TimelineStore}. A hash map
@ -114,21 +114,20 @@ abstract class KeyValueBasedTimelineStore
fields = EnumSet.allOf(Field.class); fields = EnumSet.allOf(Field.class);
} }
Iterator<TimelineEntity> entityIterator = null; TimelineEntity firstEntity = null;
if (fromId != null) { if (fromId != null) {
TimelineEntity firstEntity = entities.get(new EntityIdentifier(fromId, firstEntity = entities.get(new EntityIdentifier(fromId,
entityType)); entityType));
if (firstEntity == null) { if (firstEntity == null) {
return new TimelineEntities(); return new TimelineEntities();
} else {
entityIterator = entities.valueSetIterator(firstEntity);
} }
} }
if (entityIterator == null) {
entityIterator = entities.valueSetIterator();
}
List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>(); List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
try(CloseableIterator<TimelineEntity> entityIterator =
firstEntity == null ? entities.valueSetIterator() :
entities.valueSetIterator(firstEntity)) {
while (entityIterator.hasNext()) { while (entityIterator.hasNext()) {
TimelineEntity entity = entityIterator.next(); TimelineEntity entity = entityIterator.next();
if (entitiesSelected.size() >= limit) { if (entitiesSelected.size() >= limit) {
@ -143,13 +142,13 @@ abstract class KeyValueBasedTimelineStore
if (entity.getStartTime() > windowEnd) { if (entity.getStartTime() > windowEnd) {
continue; continue;
} }
if (fromTs != null && entityInsertTimes.get(new EntityIdentifier( if (fromTs != null && entityInsertTimes.get(
entity.getEntityId(), entity.getEntityType())) > fromTs) { new EntityIdentifier(entity.getEntityId(), entity.getEntityType()))
> fromTs) {
continue; continue;
} }
if (primaryFilter != null && if (primaryFilter != null && !KeyValueBasedTimelineStoreUtils
!KeyValueBasedTimelineStoreUtils.matchPrimaryFilter( .matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
entity.getPrimaryFilters(), primaryFilter)) {
continue; continue;
} }
if (secondaryFilters != null) { // AND logic if (secondaryFilters != null) { // AND logic
@ -157,8 +156,8 @@ abstract class KeyValueBasedTimelineStore
for (NameValuePair secondaryFilter : secondaryFilters) { for (NameValuePair secondaryFilter : secondaryFilters) {
if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils if (secondaryFilter != null && !KeyValueBasedTimelineStoreUtils
.matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter) .matchPrimaryFilter(entity.getPrimaryFilters(), secondaryFilter)
&& !KeyValueBasedTimelineStoreUtils.matchFilter( && !KeyValueBasedTimelineStoreUtils
entity.getOtherInfo(), secondaryFilter)) { .matchFilter(entity.getOtherInfo(), secondaryFilter)) {
flag = false; flag = false;
break; break;
} }
@ -174,6 +173,8 @@ abstract class KeyValueBasedTimelineStore
entitiesSelected.add(entity); entitiesSelected.add(entity);
} }
} }
}
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>(); List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) { for (TimelineEntity entitySelected : entitiesSelected) {
entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields( entitiesToReturn.add(KeyValueBasedTimelineStoreUtils.maskFields(
@ -569,6 +570,7 @@ abstract class KeyValueBasedTimelineStore
} }
return o; return o;
} }
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timeline;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -59,14 +60,15 @@ public class MemoryTimelineStore extends KeyValueBasedTimelineStore {
} }
@Override @Override
public Iterator<V> public CloseableIterator<V>
valueSetIterator() { valueSetIterator() {
return new TreeSet<>(internalMap.values()).iterator(); return wrapClosableIterator(new TreeSet<>(internalMap.values())
.iterator());
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Iterator<V> valueSetIterator(V minV) { public CloseableIterator<V> valueSetIterator(V minV) {
if (minV instanceof Comparable) { if (minV instanceof Comparable) {
TreeSet<V> tempTreeSet = new TreeSet<>(); TreeSet<V> tempTreeSet = new TreeSet<>();
for (V value : internalMap.values()) { for (V value : internalMap.values()) {
@ -74,11 +76,38 @@ public class MemoryTimelineStore extends KeyValueBasedTimelineStore {
tempTreeSet.add(value); tempTreeSet.add(value);
} }
} }
return tempTreeSet.iterator(); return wrapClosableIterator(tempTreeSet.iterator());
} else { } else {
return valueSetIterator(); return valueSetIterator();
} }
} }
private CloseableIterator<V> wrapClosableIterator(
final Iterator<V> iterator) {
return new CloseableIterator<V>() {
private final Iterator<V> internalIterator = iterator;
@Override
public void close() throws IOException {
// Not implemented
}
@Override
public boolean hasNext() {
return internalIterator.hasNext();
}
@Override
public V next() {
return internalIterator.next();
}
@Override
public void remove() {
internalIterator.remove();
}
};
}
} }
public MemoryTimelineStore() { public MemoryTimelineStore() {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timeline; package org.apache.hadoop.yarn.server.timeline;
import java.io.Closeable;
import java.util.Iterator; import java.util.Iterator;
/** /**
@ -48,7 +49,7 @@ interface TimelineStoreMapAdapter<K, V> {
/** /**
* @return the iterator of the value set of the map * @return the iterator of the value set of the map
*/ */
Iterator<V> valueSetIterator(); CloseableIterator<V> valueSetIterator();
/** /**
* Return the iterator of the value set of the map, starting from minV if type * Return the iterator of the value set of the map, starting from minV if type
@ -56,5 +57,9 @@ interface TimelineStoreMapAdapter<K, V> {
* @param minV * @param minV
* @return * @return
*/ */
Iterator<V> valueSetIterator(V minV); CloseableIterator<V> valueSetIterator(V minV);
interface CloseableIterator<V> extends Iterator<V>, Closeable {}
} }

View File

@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
/** /**
@ -211,18 +210,18 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
} }
@Override @Override
public Iterator<V> valueSetIterator() { public CloseableIterator<V> valueSetIterator() {
return getIterator(null, Long.MAX_VALUE); return getIterator(null, Long.MAX_VALUE);
} }
@Override @Override
public Iterator<V> valueSetIterator(V minV) { public CloseableIterator<V> valueSetIterator(V minV) {
return getIterator( return getIterator(
new EntityIdentifier(minV.getEntityId(), minV.getEntityType()), new EntityIdentifier(minV.getEntityId(), minV.getEntityType()),
minV.getStartTime()); minV.getStartTime());
} }
private Iterator<V> getIterator( private CloseableIterator<V> getIterator(
EntityIdentifier startId, long startTimeMax) { EntityIdentifier startId, long startTimeMax) {
final DBIterator internalDbIterator = entityDb.iterator(); final DBIterator internalDbIterator = entityDb.iterator();
@ -247,7 +246,7 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
= entityPrefixKeyBuilder.getBytesForLookup(); = entityPrefixKeyBuilder.getBytesForLookup();
internalDbIterator.seek(startPrefixBytes); internalDbIterator.seek(startPrefixBytes);
return new Iterator<V>() { return new CloseableIterator<V>() {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (!internalDbIterator.hasNext()) { if (!internalDbIterator.hasNext()) {
@ -284,6 +283,11 @@ public class LevelDBCacheTimelineStore extends KeyValueBasedTimelineStore {
LOG.error("LevelDB map adapter does not support iterate-and-remove" LOG.error("LevelDB map adapter does not support iterate-and-remove"
+ " use cases. "); + " use cases. ");
} }
@Override
public void close() throws IOException {
internalDbIterator.close();
}
}; };
} }
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();