HBASE-2474 Bug in HBASE-2248 - mixed version reads (not allowed by spec)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@944532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
adbcfc3846
commit
f65e61c7ab
|
@ -1921,7 +1921,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
* It is used to combine scanners from multiple Stores (aka column families).
|
||||
*/
|
||||
class RegionScanner implements InternalScanner {
|
||||
private final KeyValueHeap storeHeap;
|
||||
private KeyValueHeap storeHeap = null;
|
||||
private final byte [] stopRow;
|
||||
private Filter filter;
|
||||
private List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
|
@ -1929,11 +1929,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
private int batch;
|
||||
// Doesn't need to be volatile, always accessed under a sync'ed method
|
||||
private boolean filterClosed = false;
|
||||
private Scan theScan = null;
|
||||
private List<KeyValueScanner> extraScanners = null;
|
||||
|
||||
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
|
||||
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
|
||||
|
||||
//DebugPrint.println("HRegionScanner.<init>, threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint());
|
||||
//DebugPrint.println("HRegionScanner.<init>");
|
||||
this.filter = scan.getFilter();
|
||||
this.batch = scan.getBatch();
|
||||
|
||||
|
@ -1945,24 +1945,30 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
// If we are doing a get, we want to be [startRow,endRow] normally
|
||||
// it is [startRow,endRow) and if startRow=endRow we get nothing.
|
||||
this.isScan = scan.isGetScan() ? -1 : 0;
|
||||
|
||||
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
|
||||
if (additionalScanners != null) {
|
||||
scanners.addAll(additionalScanners);
|
||||
}
|
||||
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
|
||||
scan.getFamilyMap().entrySet()) {
|
||||
Store store = stores.get(entry.getKey());
|
||||
scanners.add(store.getScanner(scan, entry.getValue()));
|
||||
}
|
||||
this.storeHeap =
|
||||
new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
|
||||
this.theScan = scan;
|
||||
this.extraScanners = additionalScanners;
|
||||
}
|
||||
|
||||
RegionScanner(Scan scan) {
|
||||
this(scan, null);
|
||||
}
|
||||
|
||||
void initHeap() {
|
||||
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
|
||||
if (extraScanners != null) {
|
||||
scanners.addAll(extraScanners);
|
||||
}
|
||||
|
||||
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
|
||||
theScan.getFamilyMap().entrySet()) {
|
||||
Store store = stores.get(entry.getKey());
|
||||
scanners.add(store.getScanner(theScan, entry.getValue()));
|
||||
}
|
||||
this.storeHeap =
|
||||
new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
|
||||
}
|
||||
|
||||
|
||||
private void resetFilters() {
|
||||
if (filter != null) {
|
||||
filter.reset();
|
||||
|
@ -1987,6 +1993,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
|
||||
// This could be a new thread from the last time we called next().
|
||||
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
|
||||
// lazy init the store heap.
|
||||
if (storeHeap == null) {
|
||||
initHeap();
|
||||
}
|
||||
|
||||
results.clear();
|
||||
boolean returnResult = nextInternal(limit);
|
||||
if (!returnResult && filter != null && filter.filterRow()) {
|
||||
|
|
|
@ -672,4 +672,14 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
Bytes.toString(actual) + ">");
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertEquals(byte[] expected,
|
||||
byte[] actual) {
|
||||
if (Bytes.compareTo(expected, actual) != 0) {
|
||||
throw new AssertionFailedError("expected:<" +
|
||||
Bytes.toStringBinary(expected) + "> but was:<" +
|
||||
Bytes.toStringBinary(actual) + ">");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1025,18 +1025,20 @@ public class TestHRegion extends HBaseTestCase {
|
|||
region.put(put);
|
||||
|
||||
Scan scan = null;
|
||||
InternalScanner is = null;
|
||||
HRegion.RegionScanner is = null;
|
||||
|
||||
//Testing to see how many scanners that is produced by getScanner, starting
|
||||
//with known number, 2 - current = 1
|
||||
scan = new Scan();
|
||||
scan.addFamily(fam2);
|
||||
scan.addFamily(fam4);
|
||||
is = region.getScanner(scan);
|
||||
is = (RegionScanner) region.getScanner(scan);
|
||||
is.initHeap(); // i dont like this test
|
||||
assertEquals(1, ((RegionScanner)is).getStoreHeap().getHeap().size());
|
||||
|
||||
scan = new Scan();
|
||||
is = region.getScanner(scan);
|
||||
is = (RegionScanner) region.getScanner(scan);
|
||||
is.initHeap();
|
||||
assertEquals(families.length -1,
|
||||
((RegionScanner)is).getStoreHeap().getHeap().size());
|
||||
}
|
||||
|
@ -2185,6 +2187,15 @@ public class TestHRegion extends HBaseTestCase {
|
|||
}
|
||||
Assert.assertTrue(timestamp >= prevTimestamp);
|
||||
prevTimestamp = timestamp;
|
||||
|
||||
byte [] gotValue = null;
|
||||
for (KeyValue kv : result.raw()) {
|
||||
byte [] thisValue = kv.getValue();
|
||||
if (gotValue != null) {
|
||||
assertEquals(gotValue, thisValue);
|
||||
}
|
||||
gotValue = thisValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -298,6 +298,7 @@ public class TestMemStore extends TestCase {
|
|||
rwcc.completeMemstoreInsert(w);
|
||||
|
||||
// Assert that we can read back
|
||||
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
|
||||
|
||||
KeyValueScanner s = this.memstore.getScanners()[0];
|
||||
s.seek(kv);
|
||||
|
@ -310,7 +311,7 @@ public class TestMemStore extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void no_testReadOwnWritesUnderConcurrency() throws Throwable {
|
||||
public void testReadOwnWritesUnderConcurrency() throws Throwable {
|
||||
|
||||
int NUM_THREADS = 8;
|
||||
|
||||
|
|
Loading…
Reference in New Issue