From 057499474c346b28ad5ac3ab7da420814eba547d Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Wed, 1 Apr 2015 13:05:30 -0700 Subject: [PATCH] HBASE-13374 Small scanners (with particular configurations) do not return all rows Signed-off-by: Enis Soztutar --- .../client/ClientSmallReversedScanner.java | 15 +--- .../hbase/client/ClientSmallScanner.java | 17 +--- .../client/TestScannersFromClientSide.java | 83 +++++++++++++++++++ 3 files changed, 91 insertions(+), 24 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index ac0837ed6f4..28502dc0244 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -49,7 +49,6 @@ import com.google.common.annotations.VisibleForTesting; public class ClientSmallReversedScanner extends ReversedClientScanner { private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private ScannerCallableWithReplicas smallScanCallable = null; - private byte[] skipRowOfFirstResult = null; private SmallScannerCallableFactory callableFactory; /** @@ -135,7 +134,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { // Where to start the next getter byte[] localStartKey; int cacheNum = nbRows; - skipRowOfFirstResult = null; + boolean regionChanged = true; // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null && currentRegionDone) { byte[] startKey = this.currentRegion.getStartKey(); @@ -154,9 +153,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { LOG.debug("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { - localStartKey = this.lastResult.getRow(); - skipRowOfFirstResult = this.lastResult.getRow(); - cacheNum++; + regionChanged = false; + localStartKey = createClosestRowBefore(lastResult.getRow()); } else { localStartKey = this.scan.getStartRow(); } @@ -170,7 +168,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); - if (this.scanMetrics != null && skipRowOfFirstResult == null) { + if (this.scanMetrics != null && regionChanged) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; @@ -221,11 +219,6 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { if (values != null && values.length > 0) { for (int i = 0; i < values.length; i++) { Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } cache.add(rs); // We don't make Iterator here for (Cell cell : rs.rawCells()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 1c6be16c5fa..77321b012db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -53,9 +53,6 @@ import com.google.protobuf.ServiceException; public class ClientSmallScanner extends ClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private ScannerCallableWithReplicas smallScanCallable = null; - // When fetching results from server, skip the first result if it has the same - // row with this one - private byte[] skipRowOfFirstResult = null; private SmallScannerCallableFactory callableFactory; /** @@ -144,7 +141,7 @@ public class ClientSmallScanner extends ClientScanner { // Where to start the next getter byte[] localStartKey; int cacheNum = nbRows; - skipRowOfFirstResult = null; + boolean regionChanged = true; // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null && currentRegionDone) { byte[] endKey = this.currentRegion.getEndKey(); @@ -161,9 +158,8 @@ public class ClientSmallScanner extends ClientScanner { LOG.trace("Finished with region " + this.currentRegion); } } else if (this.lastResult != null) { - localStartKey = this.lastResult.getRow(); - skipRowOfFirstResult = this.lastResult.getRow(); - cacheNum++; + regionChanged = false; + localStartKey = Bytes.add(lastResult.getRow(), new byte[1]); } else { localStartKey = this.scan.getStartRow(); } @@ -175,7 +171,7 @@ public class ClientSmallScanner extends ClientScanner { smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); - if (this.scanMetrics != null && skipRowOfFirstResult == null) { + if (this.scanMetrics != null && regionChanged) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; @@ -269,11 +265,6 @@ public class ClientSmallScanner extends ClientScanner { if (values != null && values.length > 0) { for (int i = 0; i < values.length; i++) { Result rs = values[i]; - if (i == 0 && this.skipRowOfFirstResult != null - && Bytes.equals(skipRowOfFirstResult, rs.getRow())) { - // Skip the first result - continue; - } cache.add(rs); // We don't make Iterator here for (Cell cell : rs.rawCells()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index a6c1cfef3e8..1e939335a04 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -24,8 +24,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTestConst; @@ -68,6 +70,8 @@ public class TestScannersFromClientSide { */ @BeforeClass public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); TEST_UTIL.startMiniCluster(3); } @@ -170,6 +174,85 @@ public class TestScannersFromClientSide { } + @Test + public void testSmallScan() throws Exception { + TableName TABLE = TableName.valueOf("testSmallScan"); + + int numRows = 10; + byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); + + int numQualifiers = 10; + byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, numQualifiers); + + Table ht = TEST_UTIL.createTable(TABLE, FAMILY); + + Put put; + List puts = new ArrayList(); + for (int row = 0; row < ROWS.length; row++) { + put = new Put(ROWS[row]); + for (int qual = 0; qual < QUALIFIERS.length; qual++) { + KeyValue kv = new KeyValue(ROWS[row], FAMILY, QUALIFIERS[qual], VALUE); + put.add(kv); + } + puts.add(put); + } + ht.put(puts); + + int expectedRows = numRows; + int expectedCols = numRows * numQualifiers; + + // Test normal and reversed + testSmallScan(ht, true, expectedRows, expectedCols); + testSmallScan(ht, false, expectedRows, expectedCols); + } + + /** + * Run through a variety of test configurations with a small scan + * @param table + * @param reversed + * @param rows + * @param columns + * @throws Exception + */ + public void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { + Scan baseScan = new Scan(); + baseScan.setReversed(reversed); + baseScan.setSmall(true); + + Scan scan = new Scan(baseScan); + verifyExpectedCounts(table, scan, rows, columns); + + scan = new Scan(baseScan); + scan.setMaxResultSize(1); + verifyExpectedCounts(table, scan, rows, columns); + + scan = new Scan(baseScan); + scan.setMaxResultSize(1); + scan.setCaching(Integer.MAX_VALUE); + verifyExpectedCounts(table, scan, rows, columns); + } + + private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, + int expectedCellCount) throws Exception { + ResultScanner scanner = table.getScanner(scan); + + int rowCount = 0; + int cellCount = 0; + Result r = null; + while ((r = scanner.next()) != null) { + rowCount++; + for (Cell c : r.rawCells()) { + cellCount++; + } + } + + assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, + expectedRowCount == rowCount); + assertTrue("Expected cell count: " + expectedCellCount + " Actual cell count: " + cellCount, + expectedCellCount == cellCount); + scanner.close(); + } + /** * Test from client side for get with maxResultPerCF set *