HBASE-13827 Delayed scanner close in KeyValueHeap and StoreScanner.

This commit is contained in:
anoopsjohn 2015-06-06 08:02:35 +05:30
parent c1be65ecf0
commit fef6d7f48c
8 changed files with 88 additions and 43 deletions

View File

@ -288,6 +288,11 @@ public class HalfStoreFileReader extends StoreFile.Reader {
public Cell getNextIndexedKey() { public Cell getNextIndexedKey() {
return null; return null;
} }
@Override
public void close() {
this.delegate.close();
}
}; };
} }

View File

@ -1077,6 +1077,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen)); + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
} }
@Override
public void close() {
// HBASE-12295 will add code here.
}
} }
public Path getPath() { public Path getPath() {

View File

@ -149,4 +149,9 @@ public interface HFileScanner {
* @return the next key in the index (the key to seek to the next block) * @return the next key in the index (the key to seek to the next block)
*/ */
Cell getNextIndexedKey(); Cell getNextIndexedKey();
/**
* Close this HFile scanner and do necessary cleanup.
*/
void close();
} }

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Set;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -59,7 +61,9 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
protected KeyValueScanner current = null; protected KeyValueScanner current = null;
protected KVScannerComparator comparator; protected KVScannerComparator comparator;
protected Set<KeyValueScanner> scannersForDelayedClose = new HashSet<KeyValueScanner>();
/** /**
* Constructor. This KeyValueHeap will handle closing of passed in * Constructor. This KeyValueHeap will handle closing of passed in
* KeyValueScanners. * KeyValueScanners.
@ -87,7 +91,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
if (scanner.peek() != null) { if (scanner.peek() != null) {
this.heap.add(scanner); this.heap.add(scanner);
} else { } else {
scanner.close(); this.scannersForDelayedClose.add(scanner);
} }
} }
this.current = pollRealKV(); this.current = pollRealKV();
@ -108,7 +112,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
Cell kvReturn = this.current.next(); Cell kvReturn = this.current.next();
Cell kvNext = this.current.peek(); Cell kvNext = this.current.peek();
if (kvNext == null) { if (kvNext == null) {
this.current.close(); this.scannersForDelayedClose.add(this.current);
this.current = pollRealKV(); this.current = pollRealKV();
} else { } else {
KeyValueScanner topScanner = this.heap.peek(); KeyValueScanner topScanner = this.heap.peek();
@ -154,7 +158,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
*/ */
if (pee == null || !moreCells) { if (pee == null || !moreCells) {
this.current.close(); this.scannersForDelayedClose.add(this.current);
} else { } else {
this.heap.add(this.current); this.heap.add(this.current);
} }
@ -210,6 +214,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
} }
public void close() { public void close() {
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
scanner.close();
}
this.scannersForDelayedClose.clear();
if (this.current != null) { if (this.current != null) {
this.current.close(); this.current.close();
} }
@ -311,7 +319,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
} }
if (!seekResult) { if (!seekResult) {
scanner.close(); this.scannersForDelayedClose.add(scanner);
} else { } else {
heap.add(scanner); heap.add(scanner);
} }
@ -364,12 +372,12 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
} else { } else {
// Close the scanner because we did a real seek and found out there // Close the scanner because we did a real seek and found out there
// are no more KVs. // are no more KVs.
kvScanner.close(); this.scannersForDelayedClose.add(kvScanner);
} }
} else { } else {
// Close the scanner because it has already run out of KVs even before // Close the scanner because it has already run out of KVs even before
// we had to do a real seek on it. // we had to do a real seek on it.
kvScanner.close(); this.scannersForDelayedClose.add(kvScanner);
} }
kvScanner = heap.poll(); kvScanner = heap.poll();
} }

View File

@ -85,7 +85,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
} }
if (!scanner.seekToPreviousRow(seekKey)) { if (!scanner.seekToPreviousRow(seekKey)) {
scanner.close(); this.scannersForDelayedClose.add(scanner);
} else { } else {
heap.add(scanner); heap.add(scanner);
} }
@ -114,7 +114,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
return current != null; return current != null;
} }
if (!scanner.backwardSeek(seekKey)) { if (!scanner.backwardSeek(seekKey)) {
scanner.close(); this.scannersForDelayedClose.add(scanner);
} else { } else {
heap.add(scanner); heap.add(scanner);
} }
@ -134,7 +134,7 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
if (this.current.seekToPreviousRow(kvReturn)) { if (this.current.seekToPreviousRow(kvReturn)) {
this.heap.add(this.current); this.heap.add(this.current);
} else { } else {
this.current.close(); this.scannersForDelayedClose.add(this.current);
} }
this.current = pollRealKV(); this.current = pollRealKV();
} else { } else {

View File

@ -27,8 +27,6 @@ import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
*/ */
@InterfaceAudience.LimitedPrivate("Coprocessor") @InterfaceAudience.LimitedPrivate("Coprocessor")
public class StoreFileScanner implements KeyValueScanner { public class StoreFileScanner implements KeyValueScanner {
private static final Log LOG = LogFactory.getLog(HStore.class);
// the reader it comes from: // the reader it comes from:
private final StoreFile.Reader reader; private final StoreFile.Reader reader;
private final HFileScanner hfs; private final HFileScanner hfs;
@ -158,7 +154,7 @@ public class StoreFileScanner implements KeyValueScanner {
try { try {
try { try {
if(!seekAtOrAfter(hfs, key)) { if(!seekAtOrAfter(hfs, key)) {
close(); this.cur = null;
return false; return false;
} }
@ -185,7 +181,7 @@ public class StoreFileScanner implements KeyValueScanner {
try { try {
try { try {
if (!reseekAtOrAfter(hfs, key)) { if (!reseekAtOrAfter(hfs, key)) {
close(); this.cur = null;
return false; return false;
} }
setCurrentCell(hfs.getKeyValue()); setCurrentCell(hfs.getKeyValue());
@ -219,7 +215,7 @@ public class StoreFileScanner implements KeyValueScanner {
Cell startKV = cur; Cell startKV = cur;
while(enforceMVCC while(enforceMVCC
&& cur != null && cur != null
&& (cur.getMvccVersion() > readPt)) { && (cur.getSequenceId() > readPt)) {
hfs.next(); hfs.next();
setCurrentCell(hfs.getKeyValue()); setCurrentCell(hfs.getKeyValue());
if (this.stopSkippingKVsIfNextRow if (this.stopSkippingKVsIfNextRow
@ -229,7 +225,6 @@ public class StoreFileScanner implements KeyValueScanner {
} }
if (cur == null) { if (cur == null) {
close();
return false; return false;
} }
@ -237,8 +232,8 @@ public class StoreFileScanner implements KeyValueScanner {
} }
public void close() { public void close() {
// Nothing to close on HFileScanner?
cur = null; cur = null;
this.hfs.close();
} }
/** /**
@ -421,7 +416,6 @@ public class StoreFileScanner implements KeyValueScanner {
} }
@Override @Override
@SuppressWarnings("deprecation")
public boolean seekToPreviousRow(Cell key) throws IOException { public boolean seekToPreviousRow(Cell key) throws IOException {
try { try {
try { try {
@ -429,7 +423,7 @@ public class StoreFileScanner implements KeyValueScanner {
key.getRowLength()); key.getRowLength());
if (seekCount != null) seekCount.incrementAndGet(); if (seekCount != null) seekCount.incrementAndGet();
if (!hfs.seekBefore(seekKey)) { if (!hfs.seekBefore(seekKey)) {
close(); this.cur = null;
return false; return false;
} }
KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue() KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
@ -437,7 +431,7 @@ public class StoreFileScanner implements KeyValueScanner {
if (seekCount != null) seekCount.incrementAndGet(); if (seekCount != null) seekCount.incrementAndGet();
if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) { if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
close(); this.cur = null;
return false; return false;
} }

View File

@ -22,8 +22,10 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -84,6 +86,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final long maxRowSize; protected final long maxRowSize;
protected final long cellsPerHeartbeatCheck; protected final long cellsPerHeartbeatCheck;
protected Set<KeyValueHeap> heapsForDelayedClose = new HashSet<KeyValueHeap>();
/** /**
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
* KVs skipped via seeking to next row/column. TODO: estimate them? * KVs skipped via seeking to next row/column. TODO: estimate them?
@ -437,17 +441,32 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override @Override
public void close() { public void close() {
close(true);
}
private void close(boolean withHeapClose){
lock.lock(); lock.lock();
try { try {
if (this.closing) return; if (this.closing) return;
this.closing = true; this.closing = true;
// under test, we dont have a this.store // under test, we dont have a this.store
if (this.store != null) if (this.store != null) this.store.deleteChangedReaderObserver(this);
this.store.deleteChangedReaderObserver(this); if (withHeapClose) {
if (this.heap != null) for (KeyValueHeap h : this.heapsForDelayedClose) {
this.heap.close(); h.close();
this.heap = null; // CLOSED! }
this.lastTop = null; // If both are null, we are closed. this.heapsForDelayedClose.clear();
if (this.heap != null) {
this.heap.close();
this.heap = null; // CLOSED!
}
} else {
if (this.heap != null) {
this.heapsForDelayedClose.add(this.heap);
this.heap = null;
}
}
this.lastTop = null; // If both are null, we are closed.
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -491,13 +510,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// if the heap was left null, then the scanners had previously run out anyways, close and // if the heap was left null, then the scanners had previously run out anyways, close and
// return. // return.
if (this.heap == null) { if (this.heap == null) {
close(); close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
Cell peeked = this.heap.peek(); Cell peeked = this.heap.peek();
if (peeked == null) { if (peeked == null) {
close(); close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
@ -547,7 +566,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
Filter f = matcher.getFilter(); Filter f = matcher.getFilter();
if (f != null) { if (f != null) {
// TODO convert Scan Query Matcher to be Cell instead of KV based ?
cell = f.transformCell(cell); cell = f.transformCell(cell);
} }
@ -604,7 +622,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
case DONE_SCAN: case DONE_SCAN:
close(); close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
case SEEK_NEXT_ROW: case SEEK_NEXT_ROW:
@ -626,7 +644,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
break; break;
case SEEK_NEXT_USING_HINT: case SEEK_NEXT_USING_HINT:
// TODO convert resee to Cell?
Cell nextKV = matcher.getNextKeyHint(cell); Cell nextKV = matcher.getNextKeyHint(cell);
if (nextKV != null) { if (nextKV != null) {
seekAsDirection(nextKV); seekAsDirection(nextKV);
@ -645,7 +662,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
} }
// No more keys // No more keys
close(); close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} finally { } finally {
lock.unlock(); lock.unlock();
@ -705,7 +722,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
//DebugPrint.println("SS updateReaders, topKey = " + lastTop); //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
// close scanners to old obsolete Store files // close scanners to old obsolete Store files
this.heap.close(); // bubble thru and close all scanners. this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
// Let the next() call handle re-creating and seeking // Let the next() call handle re-creating and seeking

View File

@ -190,12 +190,14 @@ public class TestKeyValueHeap extends HBaseTestCase {
l1.add(new KeyValue(row1, fam1, col5, data)); l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data)); l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data)); l1.add(new KeyValue(row2, fam1, col2, data));
scanners.add(new Scanner(l1)); Scanner s1 = new Scanner(l1);
scanners.add(s1);
List<Cell> l2 = new ArrayList<Cell>(); List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data)); l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data)); l2.add(new KeyValue(row1, fam1, col2, data));
scanners.add(new Scanner(l2)); Scanner s2 = new Scanner(l2);
scanners.add(s2);
List<Cell> l3 = new ArrayList<Cell>(); List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data)); l3.add(new KeyValue(row1, fam1, col3, data));
@ -203,16 +205,25 @@ public class TestKeyValueHeap extends HBaseTestCase {
l3.add(new KeyValue(row1, fam2, col1, data)); l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data)); l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data)); l3.add(new KeyValue(row2, fam1, col3, data));
scanners.add(new Scanner(l3)); Scanner s3 = new Scanner(l3);
scanners.add(s3);
List<Cell> l4 = new ArrayList<Cell>(); List<Cell> l4 = new ArrayList<Cell>();
scanners.add(new Scanner(l4)); Scanner s4 = new Scanner(l4);
scanners.add(s4);
//Creating KeyValueHeap //Creating KeyValueHeap
KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
while(kvh.next() != null); while(kvh.next() != null);
// Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
// queue and added to a Set for lazy close. The actual close will happen only on KVHeap#close()
assertEquals(4, kvh.scannersForDelayedClose.size());
assertTrue(kvh.scannersForDelayedClose.contains(s1));
assertTrue(kvh.scannersForDelayedClose.contains(s2));
assertTrue(kvh.scannersForDelayedClose.contains(s3));
assertTrue(kvh.scannersForDelayedClose.contains(s4));
kvh.close();
for(KeyValueScanner scanner : scanners) { for(KeyValueScanner scanner : scanners) {
assertTrue(((Scanner)scanner).isClosed()); assertTrue(((Scanner)scanner).isClosed());
} }