HBASE-3416 For intra-row scanning, the update readers notification resets the query matcher and can lead to incorrect behavior
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1066744 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
37f7a9f1f3
commit
a3a2ce9215
|
@ -36,6 +36,9 @@ Release 0.91.0 - Unreleased
|
|||
notify call (Bruno Dumon via Stack)
|
||||
HBASE-3494 checkAndPut implementation doesnt verify row param and writable
|
||||
row are the same
|
||||
HBASE-3416 For intra-row scanning, the update readers notification resets
|
||||
the query matcher and can lead to incorrect behavior
|
||||
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
|
||||
|
|
|
@ -374,10 +374,17 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
// Combine all seeked scanners with a heap
|
||||
heap = new KeyValueHeap(scanners, store.comparator);
|
||||
|
||||
// Reset the state of the Query Matcher and set to top row
|
||||
matcher.reset();
|
||||
// Reset the state of the Query Matcher and set to top row.
|
||||
// Only reset and call setRow if the row changes; avoids confusing the
|
||||
// query matcher if scanning intra-row.
|
||||
KeyValue kv = heap.peek();
|
||||
matcher.setRow((kv == null ? lastTopKey : kv).getRow());
|
||||
if (kv == null) {
|
||||
kv = lastTopKey;
|
||||
}
|
||||
if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
|
||||
matcher.reset();
|
||||
matcher.setRow(kv.getRow());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
|
@ -51,17 +52,17 @@ public class TestWideScanner extends HBaseTestCase {
|
|||
new HTableDescriptor("testwidescan");
|
||||
static {
|
||||
TESTTABLEDESC.addFamily(new HColumnDescriptor(A,
|
||||
10, // Ten is arbitrary number. Keep versions to help debuggging.
|
||||
100, // Keep versions to help debuggging.
|
||||
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
|
||||
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
|
||||
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
|
||||
TESTTABLEDESC.addFamily(new HColumnDescriptor(B,
|
||||
10, // Ten is arbitrary number. Keep versions to help debuggging.
|
||||
100, // Keep versions to help debuggging.
|
||||
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
|
||||
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
|
||||
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
|
||||
TESTTABLEDESC.addFamily(new HColumnDescriptor(C,
|
||||
10, // Ten is arbitrary number. Keep versions to help debuggging.
|
||||
100, // Keep versions to help debuggging.
|
||||
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
|
||||
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
|
||||
HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
|
||||
|
@ -88,19 +89,23 @@ public class TestWideScanner extends HBaseTestCase {
|
|||
int count = 0;
|
||||
for (char c = 'a'; c <= 'c'; c++) {
|
||||
byte[] row = Bytes.toBytes("ab" + c);
|
||||
int i;
|
||||
for (i = 0; i < 2500; i++) {
|
||||
int i, j;
|
||||
long ts = System.currentTimeMillis();
|
||||
for (i = 0; i < 100; i++) {
|
||||
byte[] b = Bytes.toBytes(String.format("%10d", i));
|
||||
for (j = 0; j < 100; j++) {
|
||||
Put put = new Put(row);
|
||||
put.add(COLUMNS[rng.nextInt(COLUMNS.length)], b, b);
|
||||
put.add(COLUMNS[rng.nextInt(COLUMNS.length)], b, ++ts, b);
|
||||
region.put(put);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
public void testWideScanBatching() throws IOException {
|
||||
final int batch = 256;
|
||||
try {
|
||||
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
|
||||
int inserted = addWideContent(this.r);
|
||||
|
@ -109,7 +114,8 @@ public class TestWideScanner extends HBaseTestCase {
|
|||
scan.addFamily(A);
|
||||
scan.addFamily(B);
|
||||
scan.addFamily(C);
|
||||
scan.setBatch(1000);
|
||||
scan.setMaxVersions(100);
|
||||
scan.setBatch(batch);
|
||||
InternalScanner s = r.getScanner(scan);
|
||||
int total = 0;
|
||||
int i = 0;
|
||||
|
@ -119,8 +125,8 @@ public class TestWideScanner extends HBaseTestCase {
|
|||
i++;
|
||||
LOG.info("iteration #" + i + ", results.size=" + results.size());
|
||||
|
||||
// assert that the result set is no larger than 1000
|
||||
assertTrue(results.size() <= 1000);
|
||||
// assert that the result set is no larger
|
||||
assertTrue(results.size() <= batch);
|
||||
|
||||
total += results.size();
|
||||
|
||||
|
@ -133,11 +139,19 @@ public class TestWideScanner extends HBaseTestCase {
|
|||
}
|
||||
|
||||
results.clear();
|
||||
|
||||
// trigger ChangedReadersObservers
|
||||
Iterator<KeyValueScanner> scanners =
|
||||
((HRegion.RegionScanner)s).storeHeap.getHeap().iterator();
|
||||
while (scanners.hasNext()) {
|
||||
StoreScanner ss = (StoreScanner)scanners.next();
|
||||
ss.updateReaders();
|
||||
}
|
||||
} while (more);
|
||||
|
||||
// assert that the scanner returned all values
|
||||
LOG.info("inserted " + inserted + ", scanned " + total);
|
||||
assertTrue(total == inserted);
|
||||
assertEquals(total, inserted);
|
||||
|
||||
s.close();
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue