From 78930691ca6b664a07a6995c60bb8238cf7edf96 Mon Sep 17 00:00:00 2001 From: nkeywal Date: Tue, 4 Mar 2014 09:06:27 +0000 Subject: [PATCH] HBASE-9999 Add support for small reverse scan git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1573949 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/client/ClientScanner.java | 1 - .../hbase/client/ClientSmallScanner.java | 12 +- .../apache/hadoop/hbase/client/HTable.java | 25 ++- .../hbase/client/ReversedClientScanner.java | 4 +- .../hbase/client/ReversedScannerCallable.java | 2 +- .../hbase/client/TestFromClientSide.java | 166 ++++++++++++++++++ 6 files changed, 194 insertions(+), 16 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 0290dcac922..574d937b46d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ExceptionUtil; /** * Implements the scanner interface for the HBase client. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index a980ec968f4..dd20f0a4984 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -153,21 +153,23 @@ public class ClientSmallScanner extends ClientScanner { LOG.trace("Advancing internal small scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } - smallScanCallable = getSmallScanCallable(localStartKey, cacheNum); + smallScanCallable = getSmallScanCallable( + scan, getConnection(), getTable(), localStartKey, cacheNum); if (this.scanMetrics != null && skipRowOfFirstResult == null) { this.scanMetrics.countOfRegions.incrementAndGet(); } return true; } - private RegionServerCallable getSmallScanCallable( + static RegionServerCallable getSmallScanCallable( + final Scan sc, HConnection connection, TableName table, byte[] localStartKey, final int cacheNum) { - this.scan.setStartRow(localStartKey); + sc.setStartRow(localStartKey); RegionServerCallable callable = new RegionServerCallable( - getConnection(), getTable(), scan.getStartRow()) { + connection, table, sc.getStartRow()) { public Result[] call(int callTimeout) throws IOException { ScanRequest request = RequestConverter.buildScanRequest(getLocation() - .getRegionInfo().getRegionName(), scan, cacheNum, true); + .getRegionInfo().getRegionName(), sc, cacheNum, true); PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f3d44367019..624389225ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -742,15 +742,24 @@ public class HTable implements HTableInterface { if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } - if (scan.isSmall() && !scan.isReversed()) { - return new ClientSmallScanner(getConfiguration(), scan, getName(), - this.connection); - } else if (scan.isReversed()) { - return new ReversedClientScanner(getConfiguration(), scan, getName(), - this.connection); + + if (scan.isReversed()) { + if (scan.isSmall()) { + return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), + this.connection); + } else { + return new ReversedClientScanner(getConfiguration(), scan, getName(), + this.connection); + } + } + + if (scan.isSmall()) { + return new ClientSmallScanner(getConfiguration(), scan, getName(), + this.connection, this.rpcCallerFactory); + } else { + return new ClientScanner(getConfiguration(), scan, + getName(), this.connection); } - return new ClientScanner(getConfiguration(), scan, - getName(), this.connection); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 470ffa132fc..d6e17ae8f55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ExceptionUtil; /** * A reversed client scanner which support backward scanning @@ -114,6 +115,7 @@ public class ReversedClientScanner extends ClientScanner { this.scanMetrics.countOfRegions.incrementAndGet(); } } catch (IOException e) { + ExceptionUtil.rethrowIfInterrupt(e); close(); throw e; } @@ -151,7 +153,7 @@ public class ReversedClientScanner extends ClientScanner { * @param row * @return a new byte array which is the closest front row of the specified one */ - private byte[] createClosestRowBefore(byte[] row) { + protected byte[] createClosestRowBefore(byte[] row) { if (row == null) { throw new IllegalArgumentException("The passed row is empty"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 487777fc8f3..a974b01c6f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -70,7 +70,7 @@ public class ReversedScannerCallable extends ScannerCallable { this.location = connection.getRegionLocation(tableName, row, reload); if (this.location == null) { throw new IOException("Failed to find location, tableName=" - + tableName + ", row=" + Bytes.toString(row) + ", reload=" + + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload=" + reload); } } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 33a18fb2c40..1520a65a174 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -6061,4 +6061,170 @@ public class TestFromClientSide { assertEquals(insertNum, count); table.close(); } + + + /** + * Tests reversed scan under multi regions + */ + @Test + public void testSmallReversedScanUnderMultiRegions() throws Exception { + // Test Initialization. + byte[] TABLE = Bytes.toBytes("testSmallReversedScanUnderMultiRegions"); + byte[][] splitRows = new byte[][]{ + Bytes.toBytes("000"), Bytes.toBytes("002"), Bytes.toBytes("004"), + Bytes.toBytes("006"), Bytes.toBytes("008"), Bytes.toBytes("010")}; + HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows); + TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); + + assertEquals(splitRows.length + 1, table.getRegionLocations().size()); + for (byte[] splitRow : splitRows) { + Put put = new Put(splitRow); + put.add(FAMILY, QUALIFIER, VALUE); + table.put(put); + + byte[] nextRow = Bytes.copy(splitRow); + nextRow[nextRow.length - 1]++; + + put = new Put(nextRow); + put.add(FAMILY, QUALIFIER, VALUE); + table.put(put); + } + + // scan forward + ResultScanner scanner = table.getScanner(new Scan()); + int count = 0; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + } + assertEquals(12, count); + + reverseScanTest(table, false); + reverseScanTest(table, true); + + table.close(); + } + + private void reverseScanTest(HTable table, boolean small) throws IOException { + // scan backward + Scan scan = new Scan(); + scan.setReversed(true); + ResultScanner scanner = table.getScanner(scan); + int count = 0; + byte[] lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(12, count); + + scan = new Scan(); + scan.setSmall(small); + scan.setReversed(true); + scan.setStartRow(Bytes.toBytes("002")); + scanner = table.getScanner(scan); + count = 0; + lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(3, count); // 000 001 002 + + scan = new Scan(); + scan.setSmall(small); + scan.setReversed(true); + scan.setStartRow(Bytes.toBytes("002")); + scan.setStopRow(Bytes.toBytes("000")); + scanner = table.getScanner(scan); + count = 0; + lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(2, count); // 001 002 + + scan = new Scan(); + scan.setSmall(small); + scan.setReversed(true); + scan.setStartRow(Bytes.toBytes("001")); + scanner = table.getScanner(scan); + count = 0; + lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(2, count); // 000 001 + + scan = new Scan(); + scan.setSmall(small); + scan.setReversed(true); + scan.setStartRow(Bytes.toBytes("000")); + scanner = table.getScanner(scan); + count = 0; + lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(1, count); // 000 + + scan = new Scan(); + scan.setSmall(small); + scan.setReversed(true); + scan.setStartRow(Bytes.toBytes("006")); + scan.setStopRow(Bytes.toBytes("002")); + scanner = table.getScanner(scan); + count = 0; + lastRow = null; + for (Result r : scanner) { + assertTrue(!r.isEmpty()); + count++; + byte[] thisRow = r.getRow(); + if (lastRow != null) { + assertTrue("Error scan order, last row= " + Bytes.toString(lastRow) + + ",this row=" + Bytes.toString(thisRow), + Bytes.compareTo(thisRow, lastRow) < 0); + } + lastRow = thisRow; + } + assertEquals(4, count); // 003 004 005 006 + } }