HBASE-21621 Reversed scan does not return expected number of rows
The unit test is contributed by Nihal Jain
This commit is contained in:
parent
b2bf22e209
commit
7c0a3cc265
|
@ -59,10 +59,9 @@ public class ReversedStoreScanner extends StoreScanner implements KeyValueScanne
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
|
protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
|
||||||
CellComparator comparator) throws IOException {
|
CellComparator comparator) throws IOException {
|
||||||
// Combine all seeked scanners with a heap
|
return new ReversedKeyValueHeap(scanners, comparator);
|
||||||
heap = new ReversedKeyValueHeap(scanners, comparator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -403,10 +403,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
|
protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
|
||||||
CellComparator comparator) throws IOException {
|
CellComparator comparator) throws IOException {
|
||||||
// Combine all seeked scanners with a heap
|
// Combine all seeked scanners with a heap
|
||||||
heap = new KeyValueHeap(scanners, comparator);
|
heap = newKVHeap(scanners, comparator);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
|
||||||
|
CellComparator comparator) throws IOException {
|
||||||
|
return new KeyValueHeap(scanners, comparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1037,7 +1043,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
|
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
|
||||||
newCurrentScanners.addAll(fileScanners);
|
newCurrentScanners.addAll(fileScanners);
|
||||||
newCurrentScanners.addAll(memstoreScanners);
|
newCurrentScanners.addAll(memstoreScanners);
|
||||||
newHeap = new KeyValueHeap(newCurrentScanners, comparator);
|
newHeap = newKVHeap(newCurrentScanners, comparator);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("failed to switch to stream read", e);
|
LOG.warn("failed to switch to stream read", e);
|
||||||
if (fileScanners != null) {
|
if (fileScanners != null) {
|
||||||
|
|
|
@ -1059,7 +1059,7 @@ public class TestFromClientSide3 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] generateHugeValue(int size) {
|
static byte[] generateHugeValue(int size) {
|
||||||
Random rand = ThreadLocalRandom.current();
|
Random rand = ThreadLocalRandom.current();
|
||||||
byte[] value = new byte[size];
|
byte[] value = new byte[size];
|
||||||
for (int i = 0; i < value.length; i++) {
|
for (int i = 0; i < value.length; i++) {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
|
import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue;
|
||||||
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
|
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -935,4 +936,46 @@ public class TestScannersFromClientSide {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReverseScanWithFlush() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
final int BATCH_SIZE = 10;
|
||||||
|
final int ROWS_TO_INSERT = 100;
|
||||||
|
final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);
|
||||||
|
|
||||||
|
try (Table table = TEST_UTIL.createTable(tableName, FAMILY);
|
||||||
|
Admin admin = TEST_UTIL.getAdmin()) {
|
||||||
|
List<Put> putList = new ArrayList<>();
|
||||||
|
for (long i = 0; i < ROWS_TO_INSERT; i++) {
|
||||||
|
Put put = new Put(Bytes.toBytes(i));
|
||||||
|
put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE);
|
||||||
|
putList.add(put);
|
||||||
|
|
||||||
|
if (putList.size() >= BATCH_SIZE) {
|
||||||
|
table.put(putList);
|
||||||
|
admin.flush(tableName);
|
||||||
|
putList.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!putList.isEmpty()) {
|
||||||
|
table.put(putList);
|
||||||
|
admin.flush(tableName);
|
||||||
|
putList.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setReversed(true);
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
try (ResultScanner results = table.getScanner(scan)) {
|
||||||
|
for (Result result : results) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count,
|
||||||
|
ROWS_TO_INSERT, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,7 +170,13 @@ public class TestStoreScanner {
|
||||||
if (count == null) {
|
if (count == null) {
|
||||||
count = new AtomicInteger(0);
|
count = new AtomicInteger(0);
|
||||||
}
|
}
|
||||||
heap = new KeyValueHeapWithCount(scanners, comparator, count);
|
heap = newKVHeap(scanners, comparator);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
|
||||||
|
CellComparator comparator) throws IOException {
|
||||||
|
return new KeyValueHeapWithCount(scanners, comparator, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue