HBASE-23356 When construct StoreScanner throw exceptions it is possible to left some KeyValueScanner not closed. (#891)
Signed-off-by: GuangxuCheng <guangxucheng@gmail.com>
This commit is contained in:
parent
0f166edc3d
commit
580d65ee4d
|
@ -1270,18 +1270,34 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
// First the store file scanners
|
try {
|
||||||
|
// First the store file scanners
|
||||||
|
|
||||||
// TODO this used to get the store files in descending order,
|
// TODO this used to get the store files in descending order,
|
||||||
// but now we get them in ascending order, which I think is
|
// but now we get them in ascending order, which I think is
|
||||||
// actually more correct, since memstore get put at the end.
|
// actually more correct, since memstore get put at the end.
|
||||||
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
|
List<StoreFileScanner> sfScanners = StoreFileScanner
|
||||||
cacheBlocks, usePread, isCompaction, false, matcher, readPt);
|
.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false,
|
||||||
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
|
matcher, readPt);
|
||||||
scanners.addAll(sfScanners);
|
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
|
||||||
// Then the memstore scanners
|
scanners.addAll(sfScanners);
|
||||||
scanners.addAll(memStoreScanners);
|
// Then the memstore scanners
|
||||||
return scanners;
|
scanners.addAll(memStoreScanners);
|
||||||
|
return scanners;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
clearAndClose(memStoreScanners);
|
||||||
|
throw t instanceof IOException ? (IOException) t : new IOException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void clearAndClose(List<KeyValueScanner> scanners) {
|
||||||
|
if (scanners == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (KeyValueScanner s : scanners) {
|
||||||
|
s.close();
|
||||||
|
}
|
||||||
|
scanners.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1335,15 +1351,21 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
|
try {
|
||||||
cacheBlocks, usePread, isCompaction, false, matcher, readPt);
|
List<StoreFileScanner> sfScanners = StoreFileScanner
|
||||||
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
|
.getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false, matcher,
|
||||||
scanners.addAll(sfScanners);
|
readPt);
|
||||||
// Then the memstore scanners
|
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
|
||||||
if (memStoreScanners != null) {
|
scanners.addAll(sfScanners);
|
||||||
scanners.addAll(memStoreScanners);
|
// Then the memstore scanners
|
||||||
|
if (memStoreScanners != null) {
|
||||||
|
scanners.addAll(memStoreScanners);
|
||||||
|
}
|
||||||
|
return scanners;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
clearAndClose(memStoreScanners);
|
||||||
|
throw t instanceof IOException ? (IOException) t : new IOException(t);
|
||||||
}
|
}
|
||||||
return scanners;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -236,9 +236,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
store.addChangedReaderObserver(this);
|
store.addChangedReaderObserver(this);
|
||||||
|
|
||||||
|
List<KeyValueScanner> scanners = null;
|
||||||
try {
|
try {
|
||||||
// Pass columns to try to filter out unnecessary StoreFiles.
|
// Pass columns to try to filter out unnecessary StoreFiles.
|
||||||
List<KeyValueScanner> scanners = selectScannersFrom(store,
|
scanners = selectScannersFrom(store,
|
||||||
store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
|
store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
|
||||||
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
|
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
|
||||||
|
|
||||||
|
@ -258,6 +259,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// Combine all seeked scanners with a heap
|
// Combine all seeked scanners with a heap
|
||||||
resetKVHeap(scanners, comparator);
|
resetKVHeap(scanners, comparator);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
clearAndClose(scanners);
|
||||||
// remove us from the HStore#changedReaderObservers here or we'll have no chance to
|
// remove us from the HStore#changedReaderObservers here or we'll have no chance to
|
||||||
// and might cause memory leak
|
// and might cause memory leak
|
||||||
store.deleteChangedReaderObserver(this);
|
store.deleteChangedReaderObserver(this);
|
||||||
|
@ -870,6 +872,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void clearAndClose(List<KeyValueScanner> scanners) {
|
private static void clearAndClose(List<KeyValueScanner> scanners) {
|
||||||
|
if (scanners == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
for (KeyValueScanner s : scanners) {
|
for (KeyValueScanner s : scanners) {
|
||||||
s.close();
|
s.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue