HBASE-13835 KeyValueHeap.current might be in heap when exception happens in pollRealKV. (zhouyingchao)
This commit is contained in:
parent
edef3d64bc
commit
d9ba4d5bb5
|
@ -113,12 +113,14 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
|||
Cell kvNext = this.current.peek();
|
||||
if (kvNext == null) {
|
||||
this.scannersForDelayedClose.add(this.current);
|
||||
this.current = null;
|
||||
this.current = pollRealKV();
|
||||
} else {
|
||||
KeyValueScanner topScanner = this.heap.peek();
|
||||
// no need to add current back to the heap if it is the only scanner left
|
||||
if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
|
||||
this.heap.add(this.current);
|
||||
this.current = null;
|
||||
this.current = pollRealKV();
|
||||
}
|
||||
}
|
||||
|
@ -162,6 +164,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
|||
} else {
|
||||
this.heap.add(this.current);
|
||||
}
|
||||
this.current = null;
|
||||
this.current = pollRealKV();
|
||||
if (this.current == null) {
|
||||
moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||
|
@ -348,7 +351,13 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
|||
|
||||
while (kvScanner != null && !kvScanner.realSeekDone()) {
|
||||
if (kvScanner.peek() != null) {
|
||||
kvScanner.enforceSeek();
|
||||
try {
|
||||
kvScanner.enforceSeek();
|
||||
} catch (IOException ioe) {
|
||||
// Add the item to delayed close set in case it is leak from close
|
||||
this.scannersForDelayedClose.add(kvScanner);
|
||||
throw ioe;
|
||||
}
|
||||
Cell curKV = kvScanner.peek();
|
||||
if (curKV != null) {
|
||||
KeyValueScanner nextEarliestScanner = heap.peek();
|
||||
|
|
|
@ -136,12 +136,14 @@ public class ReversedKeyValueHeap extends KeyValueHeap {
|
|||
} else {
|
||||
this.scannersForDelayedClose.add(this.current);
|
||||
}
|
||||
this.current = null;
|
||||
this.current = pollRealKV();
|
||||
} else {
|
||||
KeyValueScanner topScanner = this.heap.peek();
|
||||
if (topScanner != null
|
||||
&& this.comparator.compare(this.current, topScanner) > 0) {
|
||||
this.heap.add(this.current);
|
||||
this.current = null;
|
||||
this.current = pollRealKV();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,6 +229,58 @@ public class TestKeyValueHeap extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScannerException() throws IOException {
|
||||
// Test for NPE issue when exception happens in scanners (HBASE-13835)
|
||||
|
||||
List<Cell> l1 = new ArrayList<Cell>();
|
||||
l1.add(new KeyValue(row1, fam1, col5, data));
|
||||
l1.add(new KeyValue(row2, fam1, col1, data));
|
||||
l1.add(new KeyValue(row2, fam1, col2, data));
|
||||
SeekScanner s1 = new SeekScanner(l1);
|
||||
scanners.add(s1);
|
||||
|
||||
List<Cell> l2 = new ArrayList<Cell>();
|
||||
l2.add(new KeyValue(row1, fam1, col1, data));
|
||||
l2.add(new KeyValue(row1, fam1, col2, data));
|
||||
SeekScanner s2 = new SeekScanner(l2);
|
||||
scanners.add(s2);
|
||||
|
||||
List<Cell> l3 = new ArrayList<Cell>();
|
||||
l3.add(new KeyValue(row1, fam1, col3, data));
|
||||
l3.add(new KeyValue(row1, fam1, col4, data));
|
||||
l3.add(new KeyValue(row1, fam2, col1, data));
|
||||
l3.add(new KeyValue(row1, fam2, col2, data));
|
||||
l3.add(new KeyValue(row2, fam1, col3, data));
|
||||
SeekScanner s3 = new SeekScanner(l3);
|
||||
scanners.add(s3);
|
||||
|
||||
List<Cell> l4 = new ArrayList<Cell>();
|
||||
SeekScanner s4 = new SeekScanner(l4);
|
||||
scanners.add(s4);
|
||||
|
||||
// Creating KeyValueHeap
|
||||
KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
|
||||
|
||||
try {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
((SeekScanner) scanner).setRealSeekDone(false);
|
||||
}
|
||||
while (kvh.next() != null);
|
||||
// The pollRealKV should throw IOE.
|
||||
assertTrue(false);
|
||||
} catch (IOException ioe) {
|
||||
kvh.close();
|
||||
}
|
||||
|
||||
// It implies there is no NPE thrown from kvh.close() if getting here
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
// Verify that close is called and only called once for each scanner
|
||||
assertTrue(((SeekScanner) scanner).isClosed());
|
||||
assertEquals(((SeekScanner) scanner).getClosedNum(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Scanner extends CollectionBackedScanner {
|
||||
private Iterator<Cell> iter;
|
||||
private Cell current;
|
||||
|
@ -238,6 +290,7 @@ public class TestKeyValueHeap extends HBaseTestCase {
|
|||
super(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(){
|
||||
closed = true;
|
||||
}
|
||||
|
@ -247,6 +300,36 @@ public class TestKeyValueHeap extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static class SeekScanner extends Scanner {
|
||||
private int closedNum = 0;
|
||||
private boolean realSeekDone = true;
|
||||
|
||||
public SeekScanner(List<Cell> list) {
|
||||
super(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
closedNum++;
|
||||
}
|
||||
|
||||
public int getClosedNum() {
|
||||
return closedNum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean realSeekDone() {
|
||||
return realSeekDone;
|
||||
}
|
||||
|
||||
public void setRealSeekDone(boolean done) {
|
||||
realSeekDone = done;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enforceSeek() throws IOException {
|
||||
throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue