From d4826e1665085b0ef697db548f8b6277be256591 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 27 Oct 2016 07:47:49 -0700 Subject: [PATCH] HBASE-16886 hbase-client: scanner with reversed=true and small=true gets no result (huzheng) --- .../client/ClientSmallReversedScanner.java | 115 +++++++++++++-- .../TestClientSmallReversedScanner.java | 16 +- .../client/TestSmallReversedScanner.java | 138 ++++++++++++++++++ 3 files changed, 248 insertions(+), 21 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index d4de6a0b860..bd5575a8d34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -29,13 +30,18 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable; -import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.concurrent.ExecutorService; /** @@ -49,8 +55,8 @@ import java.util.concurrent.ExecutorService; @InterfaceAudience.Private public class ClientSmallReversedScanner extends ReversedClientScanner { private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); - private ScannerCallableWithReplicas smallScanCallable = null; - private SmallScannerCallableFactory callableFactory; + private ScannerCallableWithReplicas smallReversedScanCallable = null; + private SmallReversedScannerCallableFactory callableFactory; /** * Create a new ReversibleClientScanner for the specified table. Take note that the passed @@ -80,7 +86,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout, new SmallScannerCallableFactory()); + primaryOperationTimeout, new SmallReversedScannerCallableFactory()); } /** @@ -112,7 +118,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - SmallScannerCallableFactory callableFactory) throws IOException { + SmallReversedScannerCallableFactory callableFactory) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, primaryOperationTimeout); this.callableFactory = callableFactory; @@ -136,6 +142,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { byte[] localStartKey; int cacheNum = nbRows; boolean regionChanged = true; + boolean isFirstRegionToLocate = false; // if we're at end of table, close and return false to stop iterating if (this.currentRegion != null && currentRegionDone) { byte[] startKey = this.currentRegion.getStartKey(); @@ -158,6 +165,14 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { localStartKey = createClosestRowBefore(lastResult.getRow()); } else { localStartKey = this.scan.getStartRow(); + isFirstRegionToLocate = true; + } + + if (!isFirstRegionToLocate + && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) { + // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan. + // otherwise, maybe infinity results with RowKey=0x00 will return. + return false; } if (LOG.isTraceEnabled()) { @@ -165,9 +180,10 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { + Bytes.toStringBinary(localStartKey) + "'"); } - smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, - getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), - getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); + smallReversedScanCallable = + callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), + localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), + getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate); if (this.scanMetrics != null && regionChanged) { this.scanMetrics.countOfRegions.incrementAndGet(); @@ -209,8 +225,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { // exhausted current region. // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries - values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); - this.currentRegion = smallScanCallable.getHRegionInfo(); + values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout); + this.currentRegion = smallReversedScanCallable.getHRegionInfo(); long currentTime = System.currentTimeMillis(); if (this.scanMetrics != null) { this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime @@ -229,8 +245,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { this.lastResult = rs; } } - if (smallScanCallable.hasMoreResultsContext()) { - currentRegionDone = !smallScanCallable.getServerHasMoreResults(); + if (smallReversedScanCallable.hasMoreResultsContext()) { + currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults(); } else { currentRegionDone = countdown > 0; } @@ -250,7 +266,80 @@ public class ClientSmallReversedScanner extends ReversedClientScanner { } @VisibleForTesting - protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) { + protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) { this.callableFactory = callableFactory; } + + /** + * A reversed ScannerCallable which supports backward small scanning. + */ + static class SmallReversedScannerCallable extends ReversedScannerCallable { + + public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan, + ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory, + int caching, int replicaId) { + super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId); + this.setCaching(caching); + } + + @Override + public Result[] call(int timeout) throws IOException { + if (this.closed) return null; + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + ClientProtos.ScanRequest request = RequestConverter.buildScanRequest( + getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true); + ClientProtos.ScanResponse response = null; + controller = controllerFactory.newController(); + try { + controller.setPriority(getTableName()); + controller.setCallTimeout(timeout); + response = getStub().scan(controller, request); + Result[] results = ResponseConverter.getResults(controller.cellScanner(), response); + if (response.hasMoreResultsInRegion()) { + setHasMoreResultsContext(true); + setServerHasMoreResults(response.getMoreResultsInRegion()); + } else { + setHasMoreResultsContext(false); + } + // We need to update result metrics since we are overriding call() + updateResultsMetrics(results); + return results; + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + @Override + public ScannerCallable getScannerCallableForReplica(int id) { + return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(), + scanMetrics, locateStartRow, controllerFactory, getCaching(), id); + } + } + + protected static class SmallReversedScannerCallableFactory { + + public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, + Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, + RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, + int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller caller, + boolean isFirstRegionToLocate) { + byte[] locateStartRow = null; + if (isFirstRegionToLocate + && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) { + // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to + // locate a region list, and the last one in region list is the region where our scan start. + locateStartRow = ClientScanner.MAX_BYTE_ARRAY; + } + + scan.setStartRow(localStartKey); + SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan, + scanMetrics, locateStartRow, controllerFactory, cacheNum, 0); + ScannerCallableWithReplicas scannerCallableWithReplicas = + new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan, + retries, scannerTimeout, cacheNum, conf, caller); + return scannerCallableWithReplicas; + } + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java index 4611d08dfe1..57b52e6c801 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; +import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -102,15 +102,15 @@ public class TestClientSmallReversedScanner { }; } - private SmallScannerCallableFactory getFactory( + private SmallReversedScannerCallableFactory getFactory( final ScannerCallableWithReplicas callableWithReplicas) { - return new SmallScannerCallableFactory() { + return new SmallReversedScannerCallableFactory() { @Override public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, - RpcRetryingCaller caller) { + RpcRetryingCaller caller, boolean isFirstRegionToLocate) { return callableWithReplicas; } }; @@ -135,7 +135,7 @@ public class TestClientSmallReversedScanner { // Intentionally leave a "default" caching size in the Scan. No matter the value, we // should continue based on the server context - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, @@ -204,7 +204,7 @@ public class TestClientSmallReversedScanner { // While the server returns 2 records per batch, we expect more records. scan.setCaching(2); - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, @@ -280,7 +280,7 @@ public class TestClientSmallReversedScanner { // While the server return 2 records per RPC, we expect there to be more records. scan.setCaching(2); - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, @@ -316,7 +316,7 @@ public class TestClientSmallReversedScanner { ScannerCallableWithReplicas callableWithReplicas = Mockito .mock(ScannerCallableWithReplicas.class); - SmallScannerCallableFactory factory = getFactory(callableWithReplicas); + SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas); try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java new file mode 100644 index 00000000000..3a4e92b822f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; + +@Category(MediumTests.class) +public class TestSmallReversedScanner { + public static final Log LOG = LogFactory.getLog(TestSmallReversedScanner.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("testReversedSmall"); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("columnFamily"); + + private static Table htable = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(1); + + // create a table with 4 region: (-oo, b),[b,c),[c,d),[d,+oo) + byte[] bytes = Bytes.toBytes("bcd"); + byte[][] splitKeys = new byte[bytes.length][]; + + for (int i = 0; i < bytes.length; i++) { + splitKeys[i] = new byte[] { bytes[i] }; + } + htable = TEST_UTIL.createTable(TABLE_NAME, COLUMN_FAMILY, splitKeys); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws IOException { + TEST_UTIL.deleteTableData(TABLE_NAME); + } + + /** + * all rowKeys are fit in the last region. + * @throws IOException + */ + @Test + public void testSmallReversedScan01() throws IOException { + String[][] keysCases = new String[][] { + { "d0", "d1", "d2", "d3" }, // all rowKeys fit in the last region. + { "a0", "a1", "a2", "a3" }, // all rowKeys fit in the first region. + { "a0", "b1", "c2", "d3" }, // each region with a rowKey + }; + + for (int caseIndex = 0; caseIndex < keysCases.length; caseIndex++) { + testSmallReversedScanInternal(keysCases[caseIndex]); + TEST_UTIL.deleteTableData(TABLE_NAME); + } + } + + private void testSmallReversedScanInternal(String[] inputRowKeys) throws IOException { + int rowCount = inputRowKeys.length; + + for (int i = 0; i < rowCount; i++) { + Put put = new Put(Bytes.toBytes(inputRowKeys[i])); + put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(i)); + htable.put(put); + } + + Scan scan = new Scan(); + scan.setReversed(true); + scan.setSmall(true); + + ResultScanner scanner = htable.getScanner(scan); + Result r; + int value = rowCount; + while ((r = scanner.next()) != null) { + Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(--value)); + Assert.assertArrayEquals(r.getRow(), Bytes.toBytes(inputRowKeys[value])); + } + + Assert.assertEquals(value, 0); + } + + /** + * Corner case: + * HBase has 4 regions, (-oo,b),[b,c),[c,d),[d,+oo), and only rowKey with byte[]={0x00} locate in region (-oo,b) . + * test whether reversed small scanner will return infinity results with RowKey={0x00}. + * @throws IOException + */ + @Test + public void testSmallReversedScan02() throws IOException { + Put put = new Put(new byte[] { (char) 0x00 }); + put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(0)); + htable.put(put); + + Scan scan = new Scan(); + scan.setCaching(1); + scan.setReversed(true); + scan.setSmall(true); + + ResultScanner scanner = htable.getScanner(scan); + Result r; + int count = 1; + while ((r = scanner.next()) != null) { + Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(0)); + Assert.assertArrayEquals(r.getRow(), new byte[] { (char) 0x00 }); + Assert.assertTrue(--count >= 0); + } + Assert.assertEquals(count, 0); + } +}