diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9a37ef30e9c..74c6b0ddfa5 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1921,7 +1921,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * It is used to combine scanners from multiple Stores (aka column families). */ class RegionScanner implements InternalScanner { - private final KeyValueHeap storeHeap; + private KeyValueHeap storeHeap = null; private final byte [] stopRow; private Filter filter; private List results = new ArrayList(); @@ -1929,11 +1929,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ private int batch; // Doesn't need to be volatile, always accessed under a sync'ed method private boolean filterClosed = false; + private Scan theScan = null; + private List extraScanners = null; RegionScanner(Scan scan, List additionalScanners) { - ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); - - //DebugPrint.println("HRegionScanner., threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint()); + //DebugPrint.println("HRegionScanner."); this.filter = scan.getFilter(); this.batch = scan.getBatch(); @@ -1945,24 +1945,30 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // If we are doing a get, we want to be [startRow,endRow] normally // it is [startRow,endRow) and if startRow=endRow we get nothing. this.isScan = scan.isGetScan() ? -1 : 0; - - List scanners = new ArrayList(); - if (additionalScanners != null) { - scanners.addAll(additionalScanners); - } - for (Map.Entry> entry : - scan.getFamilyMap().entrySet()) { - Store store = stores.get(entry.getKey()); - scanners.add(store.getScanner(scan, entry.getValue())); - } - this.storeHeap = - new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator); + this.theScan = scan; + this.extraScanners = additionalScanners; } RegionScanner(Scan scan) { this(scan, null); } + void initHeap() { + List scanners = new ArrayList(); + if (extraScanners != null) { + scanners.addAll(extraScanners); + } + + for (Map.Entry> entry : + theScan.getFamilyMap().entrySet()) { + Store store = stores.get(entry.getKey()); + scanners.add(store.getScanner(theScan, entry.getValue())); + } + this.storeHeap = + new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator); + } + + private void resetFilters() { if (filter != null) { filter.reset(); @@ -1987,6 +1993,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // This could be a new thread from the last time we called next(). ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + // lazy init the store heap. + if (storeHeap == null) { + initHeap(); + } + results.clear(); boolean returnResult = nextInternal(limit); if (!returnResult && filter != null && filter.filterRow()) { diff --git a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java index 42587372724..83be0974c0d 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/core/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java @@ -672,4 +672,14 @@ public abstract class HBaseTestCase extends TestCase { Bytes.toString(actual) + ">"); } } + + public static void assertEquals(byte[] expected, + byte[] actual) { + if (Bytes.compareTo(expected, actual) != 0) { + throw new AssertionFailedError("expected:<" + + Bytes.toStringBinary(expected) + "> but was:<" + + Bytes.toStringBinary(actual) + ">"); + } + } + } diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2a4f7785315..6d51b8f14b1 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1025,19 +1025,21 @@ public class TestHRegion extends HBaseTestCase { region.put(put); Scan scan = null; - InternalScanner is = null; - - //Testing to see how many scanners that is produced by getScanner, starting + HRegion.RegionScanner is = null; + + //Testing to see how many scanners that is produced by getScanner, starting //with known number, 2 - current = 1 scan = new Scan(); scan.addFamily(fam2); scan.addFamily(fam4); - is = region.getScanner(scan); + is = (RegionScanner) region.getScanner(scan); + is.initHeap(); // i dont like this test assertEquals(1, ((RegionScanner)is).getStoreHeap().getHeap().size()); scan = new Scan(); - is = region.getScanner(scan); - assertEquals(families.length -1, + is = (RegionScanner) region.getScanner(scan); + is.initHeap(); + assertEquals(families.length -1, ((RegionScanner)is).getStoreHeap().getHeap().size()); } @@ -2185,6 +2187,15 @@ public class TestHRegion extends HBaseTestCase { } Assert.assertTrue(timestamp >= prevTimestamp); prevTimestamp = timestamp; + + byte [] gotValue = null; + for (KeyValue kv : result.raw()) { + byte [] thisValue = kv.getValue(); + if (gotValue != null) { + assertEquals(gotValue, thisValue); + } + gotValue = thisValue; + } } } diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 18bb3710bfd..9d0d3a9bc83 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -298,6 +298,7 @@ public class TestMemStore extends TestCase { rwcc.completeMemstoreInsert(w); // Assert that we can read back + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); KeyValueScanner s = this.memstore.getScanners()[0]; s.seek(kv); @@ -310,7 +311,7 @@ public class TestMemStore extends TestCase { } } - public void no_testReadOwnWritesUnderConcurrency() throws Throwable { + public void testReadOwnWritesUnderConcurrency() throws Throwable { int NUM_THREADS = 8;