HBASE-1927 Scanners not closed properly in certain circumstances
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@828816 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a1f63322bb
commit
98aee5440f
|
@ -78,6 +78,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-1777 column length is not checked before saved to memstore
|
||||
HBASE-1925 IllegalAccessError: Has not been initialized (getMaxSequenceId)
|
||||
HBASE-1929 If hbase-default.xml is not in CP, zk session timeout is 10 secs!
|
||||
HBASE-1927 Scanners not closed properly in certain circumstances
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -46,7 +46,8 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
private KVScannerComparator comparator;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* Constructor. This KeyValueHeap will handle closing of passed in
|
||||
* KeyValueScanners.
|
||||
* @param scanners
|
||||
* @param comparator
|
||||
*/
|
||||
|
@ -57,6 +58,8 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
for (KeyValueScanner scanner : scanners) {
|
||||
if (scanner.peek() != null) {
|
||||
this.heap.add(scanner);
|
||||
} else {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
this.current = heap.poll();
|
||||
|
|
|
@ -154,9 +154,46 @@ implements HConstants {
|
|||
|
||||
}
|
||||
|
||||
public void testScannerLeak() {
|
||||
// Test for unclosed scanners (HBASE-1927)
|
||||
|
||||
List<KeyValue> l1 = new ArrayList<KeyValue>();
|
||||
l1.add(new KeyValue(row1, fam1, col5, data));
|
||||
l1.add(new KeyValue(row2, fam1, col1, data));
|
||||
l1.add(new KeyValue(row2, fam1, col2, data));
|
||||
scanners.add(new Scanner(l1));
|
||||
|
||||
List<KeyValue> l2 = new ArrayList<KeyValue>();
|
||||
l2.add(new KeyValue(row1, fam1, col1, data));
|
||||
l2.add(new KeyValue(row1, fam1, col2, data));
|
||||
scanners.add(new Scanner(l2));
|
||||
|
||||
List<KeyValue> l3 = new ArrayList<KeyValue>();
|
||||
l3.add(new KeyValue(row1, fam1, col3, data));
|
||||
l3.add(new KeyValue(row1, fam1, col4, data));
|
||||
l3.add(new KeyValue(row1, fam2, col1, data));
|
||||
l3.add(new KeyValue(row1, fam2, col2, data));
|
||||
l3.add(new KeyValue(row2, fam1, col3, data));
|
||||
scanners.add(new Scanner(l3));
|
||||
|
||||
List<KeyValue> l4 = new ArrayList<KeyValue>();
|
||||
scanners.add(new Scanner(l4));
|
||||
|
||||
//Creating KeyValueHeap
|
||||
KeyValueHeap kvh =
|
||||
new KeyValueHeap(scanners.toArray(new Scanner[0]), KeyValue.COMPARATOR);
|
||||
|
||||
while(kvh.next() != null);
|
||||
|
||||
for(Scanner scanner : scanners) {
|
||||
assertTrue(scanner.isClosed());
|
||||
}
|
||||
}
|
||||
|
||||
private static class Scanner implements KeyValueScanner {
|
||||
private Iterator<KeyValue> iter;
|
||||
private KeyValue current;
|
||||
private boolean closed = false;
|
||||
|
||||
public Scanner(List<KeyValue> list) {
|
||||
Collections.sort(list, KeyValue.COMPARATOR);
|
||||
|
@ -180,7 +217,13 @@ implements HConstants {
|
|||
return oldCurrent;
|
||||
}
|
||||
|
||||
public void close(){}
|
||||
public void close(){
|
||||
closed = true;
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
public boolean seek(KeyValue seekKv) {
|
||||
while(iter.hasNext()){
|
||||
|
|
Loading…
Reference in New Issue