HBASE-16032 Possible memory leak in StoreScanner

This commit is contained in:
Yu Li 2016-06-21 20:06:33 +08:00
parent f06945ae6c
commit c6b8c9bb02
2 changed files with 49 additions and 29 deletions

View File

@ -5688,26 +5688,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
List<KeyValueScanner> joinedScanners List<KeyValueScanner> joinedScanners
= new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
if (additionalScanners != null) { // Store all already instantiated scanners for exception handling
List<KeyValueScanner> instantiatedScanners = new ArrayList<KeyValueScanner>();
// handle additionalScanners
if (additionalScanners != null && !additionalScanners.isEmpty()) {
scanners.addAll(additionalScanners); scanners.addAll(additionalScanners);
instantiatedScanners.addAll(additionalScanners);
} }
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { try {
Store store = stores.get(entry.getKey()); for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
KeyValueScanner scanner; Store store = stores.get(entry.getKey());
try { KeyValueScanner scanner;
scanner = store.getScanner(scan, entry.getValue(), this.readPt); try {
} catch (FileNotFoundException e) { scanner = store.getScanner(scan, entry.getValue(), this.readPt);
throw handleFileNotFound(e); } catch (FileNotFoundException e) {
throw handleFileNotFound(e);
}
instantiatedScanners.add(scanner);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
} }
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() initializeKVHeap(scanners, joinedScanners, region);
|| this.filter.isFamilyEssential(entry.getKey())) { } catch (IOException e) {
scanners.add(scanner); // close all already instantiated scanners before throwing the exception
} else { for (KeyValueScanner scanner : instantiatedScanners) {
joinedScanners.add(scanner); scanner.close();
} }
throw e;
} }
initializeKVHeap(scanners, joinedScanners, region);
} }
protected void initializeKVHeap(List<KeyValueScanner> scanners, protected void initializeKVHeap(List<KeyValueScanner> scanners,

View File

@ -198,24 +198,31 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.store.addChangedReaderObserver(this); this.store.addChangedReaderObserver(this);
// Pass columns to try to filter out unnecessary StoreFiles. try {
List<KeyValueScanner> scanners = getScannersNoCompaction(); // Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScannersNoCompaction();
// Seek all scanners to the start of the Row (or if the exact matching row // Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row). // key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete // Always check bloom filter to optimize the top row seek for delete
// family marker. // family marker.
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
&& lazySeekEnabledGlobally, parallelSeekEnabled); parallelSeekEnabled);
// set storeLimit // set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily(); this.storeLimit = scan.getMaxResultsPerColumnFamily();
// set rowOffset // set rowOffset
this.storeOffset = scan.getRowOffsetPerColumnFamily(); this.storeOffset = scan.getRowOffsetPerColumnFamily();
addCurrentScanners(scanners); addCurrentScanners(scanners);
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator()); resetKVHeap(scanners, store.getComparator());
} catch (IOException e) {
// remove us from the HStore#changedReaderObservers here or we'll have no chance to
// and might cause memory leak
this.store.deleteChangedReaderObserver(this);
throw e;
}
} }
/** /**