From 685a17a800cb8f8fb6b03a51336501fc2c7def66 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 20 Dec 2018 12:34:34 +0800 Subject: [PATCH] HBASE-21621 Reversed scan does not return expected number of rows The unit test is contributed by Nihal Jain --- .../regionserver/ReversedStoreScanner.java | 5 +-- .../hbase/regionserver/StoreScanner.java | 10 ++++- .../hbase/client/TestFromClientSide3.java | 2 +- .../client/TestScannersFromClientSide.java | 43 +++++++++++++++++++ .../hbase/regionserver/TestStoreScanner.java | 8 +++- 5 files changed, 61 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 90e1129e8b4..491e6ef691c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -59,10 +59,9 @@ public class ReversedStoreScanner extends StoreScanner implements KeyValueScanne } @Override - protected void resetKVHeap(List scanners, + protected KeyValueHeap newKVHeap(List scanners, CellComparator comparator) throws IOException { - // Combine all seeked scanners with a heap - heap = new ReversedKeyValueHeap(scanners, comparator); + return new ReversedKeyValueHeap(scanners, comparator); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 9a613bb99f5..be226fed047 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -403,10 +403,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } + @VisibleForTesting protected void resetKVHeap(List scanners, CellComparator comparator) throws IOException { // Combine all seeked scanners with a heap - heap = new KeyValueHeap(scanners, comparator); + heap = newKVHeap(scanners, comparator); + } + + protected KeyValueHeap newKVHeap(List scanners, + CellComparator comparator) throws IOException { + return new KeyValueHeap(scanners, comparator); } /** @@ -1027,7 +1033,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); newCurrentScanners.addAll(fileScanners); newCurrentScanners.addAll(memstoreScanners); - newHeap = new KeyValueHeap(newCurrentScanners, comparator); + newHeap = newKVHeap(newCurrentScanners, comparator); } catch (Exception e) { LOG.warn("failed to switch to stream read", e); if (fileScanners != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index d55ce1bad05..b8bf97ca377 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -1055,7 +1055,7 @@ public class TestFromClientSide3 { } } - private static byte[] generateHugeValue(int size) { + static byte[] generateHugeValue(int size) { Random rand = ThreadLocalRandom.current(); byte[] value = new byte[size]; for (int i = 0; i < value.length; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index b91e205d428..af0248215cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.RPC_CODEC_CONF_KEY; +import static org.apache.hadoop.hbase.client.TestFromClientSide3.generateHugeValue; import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_CODEC_CLASS; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -935,4 +936,46 @@ public class TestScannersFromClientSide { } } } + + @Test + public void testReverseScanWithFlush() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + final int BATCH_SIZE = 10; + final int ROWS_TO_INSERT = 100; + final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); + + try (Table table = TEST_UTIL.createTable(tableName, FAMILY); + Admin admin = TEST_UTIL.getAdmin()) { + List putList = new ArrayList<>(); + for (long i = 0; i < ROWS_TO_INSERT; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); + putList.add(put); + + if (putList.size() >= BATCH_SIZE) { + table.put(putList); + admin.flush(tableName); + putList.clear(); + } + } + + if (!putList.isEmpty()) { + table.put(putList); + admin.flush(tableName); + putList.clear(); + } + + Scan scan = new Scan(); + scan.setReversed(true); + int count = 0; + + try (ResultScanner results = table.getScanner(scan)) { + for (Result result : results) { + count++; + } + } + assertEquals("Expected " + ROWS_TO_INSERT + " rows in the table but it is " + count, + ROWS_TO_INSERT, count); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 104d294e5fc..d5775da83a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -170,7 +170,13 @@ public class TestStoreScanner { if (count == null) { count = new AtomicInteger(0); } - heap = new KeyValueHeapWithCount(scanners, comparator, count); + heap = newKVHeap(scanners, comparator); + } + + @Override + protected KeyValueHeap newKVHeap(List scanners, + CellComparator comparator) throws IOException { + return new KeyValueHeapWithCount(scanners, comparator, count); } @Override