HBASE-17118 StoreScanner leaked in KeyValueHeap (binlijin)

This commit is contained in:
tedyu 2016-11-17 07:52:52 -08:00
parent b9319496bc
commit f6fc94ede9
1 changed files with 44 additions and 28 deletions

View File

@ -26,6 +26,8 @@ import java.util.List;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
@InterfaceAudience.Private @InterfaceAudience.Private
public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner { implements KeyValueScanner, InternalScanner {
private static final Log LOG = LogFactory.getLog(KeyValueHeap.class);
protected PriorityQueue<KeyValueScanner> heap = null; protected PriorityQueue<KeyValueScanner> heap = null;
// Holds the scanners when a ever a eager close() happens. All such eagerly closed // Holds the scanners when a ever a eager close() happens. All such eagerly closed
// scans are collected and when the final scanner.close() happens will perform the // scans are collected and when the final scanner.close() happens will perform the
@ -292,38 +295,51 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
} }
KeyValueScanner scanner = current; KeyValueScanner scanner = current;
while (scanner != null) { try {
Cell topKey = scanner.peek(); while (scanner != null) {
if (comparator.getComparator().compare(seekKey, topKey) <= 0) { Cell topKey = scanner.peek();
// Top KeyValue is at-or-after Seek KeyValue. We only know that all if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
// scanners are at or after seekKey (because fake keys of // Top KeyValue is at-or-after Seek KeyValue. We only know that all
// scanners where a lazy-seek operation has been done are not greater // scanners are at or after seekKey (because fake keys of
// than their real next keys) but we still need to enforce our // scanners where a lazy-seek operation has been done are not greater
// invariant that the top scanner has done a real seek. This way // than their real next keys) but we still need to enforce our
// StoreScanner and RegionScanner do not have to worry about fake keys. // invariant that the top scanner has done a real seek. This way
heap.add(scanner); // StoreScanner and RegionScanner do not have to worry about fake
current = pollRealKV(); // keys.
return current != null; heap.add(scanner);
} scanner = null;
current = pollRealKV();
return current != null;
}
boolean seekResult; boolean seekResult;
if (isLazy && heap.size() > 0) { if (isLazy && heap.size() > 0) {
// If there is only one scanner left, we don't do lazy seek. // If there is only one scanner left, we don't do lazy seek.
seekResult = scanner.requestSeek(seekKey, forward, useBloom); seekResult = scanner.requestSeek(seekKey, forward, useBloom);
} else { } else {
seekResult = NonLazyKeyValueScanner.doRealSeek( seekResult = NonLazyKeyValueScanner.doRealSeek(scanner, seekKey,
scanner, seekKey, forward); forward);
} }
if (!seekResult) { if (!seekResult) {
this.scannersForDelayedClose.add(scanner); this.scannersForDelayedClose.add(scanner);
} else { } else {
heap.add(scanner); heap.add(scanner);
}
scanner = heap.poll();
if (scanner == null) {
current = null;
}
} }
scanner = heap.poll(); } catch (Exception e) {
if (scanner == null) { if (scanner != null) {
current = null; try {
scanner.close();
} catch (Exception ce) {
LOG.warn("close KeyValueScanner error", ce);
}
} }
throw e;
} }
// Heap is returning empty, scanner is done // Heap is returning empty, scanner is done