From d9ba4d5bb513624fef8787f04b18a57ac5eb5203 Mon Sep 17 00:00:00 2001 From: anoopsjohn Date: Thu, 25 Jun 2015 23:44:37 +0530 Subject: [PATCH] HBASE-13835 KeyValueHeap.current might be in heap when exception happens in pollRealKV. (zhouyingchao) --- .../hbase/regionserver/KeyValueHeap.java | 11 ++- .../regionserver/ReversedKeyValueHeap.java | 2 + .../hbase/regionserver/TestKeyValueHeap.java | 85 ++++++++++++++++++- 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 2b9d0f5f21a..7483568628b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -113,12 +113,14 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner Cell kvNext = this.current.peek(); if (kvNext == null) { this.scannersForDelayedClose.add(this.current); + this.current = null; this.current = pollRealKV(); } else { KeyValueScanner topScanner = this.heap.peek(); // no need to add current back to the heap if it is the only scanner left if (topScanner != null && this.comparator.compare(kvNext, topScanner.peek()) >= 0) { this.heap.add(this.current); + this.current = null; this.current = pollRealKV(); } } @@ -162,6 +164,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner } else { this.heap.add(this.current); } + this.current = null; this.current = pollRealKV(); if (this.current == null) { moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -348,7 +351,13 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner while (kvScanner != null && !kvScanner.realSeekDone()) { if (kvScanner.peek() != null) { - kvScanner.enforceSeek(); + try { + kvScanner.enforceSeek(); + } catch (IOException ioe) { + // Add the item to delayed close set in case it is leak from close + this.scannersForDelayedClose.add(kvScanner); + throw ioe; + } Cell curKV = kvScanner.peek(); if (curKV != null) { KeyValueScanner nextEarliestScanner = heap.peek(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java index 69141327670..2035902341f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedKeyValueHeap.java @@ -136,12 +136,14 @@ public class ReversedKeyValueHeap extends KeyValueHeap { } else { this.scannersForDelayedClose.add(this.current); } + this.current = null; this.current = pollRealKV(); } else { KeyValueScanner topScanner = this.heap.peek(); if (topScanner != null && this.comparator.compare(this.current, topScanner) > 0) { this.heap.add(this.current); + this.current = null; this.current = pollRealKV(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java index 5b0ab3bb99f..aff40c1b141 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java @@ -229,6 +229,58 @@ public class TestKeyValueHeap extends HBaseTestCase { } } + @Test + public void testScannerException() throws IOException { + // Test for NPE issue when exception happens in scanners (HBASE-13835) + + List l1 = new ArrayList(); + l1.add(new KeyValue(row1, fam1, col5, data)); + l1.add(new KeyValue(row2, fam1, col1, data)); + l1.add(new KeyValue(row2, fam1, col2, data)); + SeekScanner s1 = new SeekScanner(l1); + scanners.add(s1); + + List l2 = new ArrayList(); + l2.add(new KeyValue(row1, fam1, col1, data)); + l2.add(new KeyValue(row1, fam1, col2, data)); + SeekScanner s2 = new SeekScanner(l2); + scanners.add(s2); + + List l3 = new ArrayList(); + l3.add(new KeyValue(row1, fam1, col3, data)); + l3.add(new KeyValue(row1, fam1, col4, data)); + l3.add(new KeyValue(row1, fam2, col1, data)); + l3.add(new KeyValue(row1, fam2, col2, data)); + l3.add(new KeyValue(row2, fam1, col3, data)); + SeekScanner s3 = new SeekScanner(l3); + scanners.add(s3); + + List l4 = new ArrayList(); + SeekScanner s4 = new SeekScanner(l4); + scanners.add(s4); + + // Creating KeyValueHeap + KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR); + + try { + for (KeyValueScanner scanner : scanners) { + ((SeekScanner) scanner).setRealSeekDone(false); + } + while (kvh.next() != null); + // The pollRealKV should throw IOE. + assertTrue(false); + } catch (IOException ioe) { + kvh.close(); + } + + // It implies there is no NPE thrown from kvh.close() if getting here + for (KeyValueScanner scanner : scanners) { + // Verify that close is called and only called once for each scanner + assertTrue(((SeekScanner) scanner).isClosed()); + assertEquals(((SeekScanner) scanner).getClosedNum(), 1); + } + } + private static class Scanner extends CollectionBackedScanner { private Iterator iter; private Cell current; @@ -238,6 +290,7 @@ public class TestKeyValueHeap extends HBaseTestCase { super(list); } + @Override public void close(){ closed = true; } @@ -247,6 +300,36 @@ public class TestKeyValueHeap extends HBaseTestCase { } } + private static class SeekScanner extends Scanner { + private int closedNum = 0; + private boolean realSeekDone = true; + public SeekScanner(List list) { + super(list); + } + + @Override + public void close() { + super.close(); + closedNum++; + } + + public int getClosedNum() { + return closedNum; + } + + @Override + public boolean realSeekDone() { + return realSeekDone; + } + + public void setRealSeekDone(boolean done) { + realSeekDone = done; + } + + @Override + public void enforceSeek() throws IOException { + throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner"); + } + } } -