HBASE-16886 hbase-client: scanner with reversed=true and small=true gets no result (huzheng)

This commit is contained in:
tedyu 2016-10-27 06:54:13 -07:00
parent a3f1490601
commit d35b65883c
3 changed files with 241 additions and 23 deletions

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBef
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
@ -34,8 +35,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -49,8 +53,8 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Private
public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
private ScannerCallableWithReplicas smallScanCallable = null;
private SmallScannerCallableFactory callableFactory;
private ScannerCallableWithReplicas smallReversedScanCallable = null;
private SmallReversedScannerCallableFactory callableFactory;
/**
* Create a new ReversibleClientScanner for the specified table. Take note that the passed
@ -80,7 +84,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout, new SmallScannerCallableFactory());
primaryOperationTimeout, new SmallReversedScannerCallableFactory());
}
/**
@ -112,7 +116,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
SmallScannerCallableFactory callableFactory) throws IOException {
SmallReversedScannerCallableFactory callableFactory) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
this.callableFactory = callableFactory;
@ -136,6 +140,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
byte[] localStartKey;
int cacheNum = nbRows;
boolean regionChanged = true;
boolean isFirstRegionToLocate = false;
// if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null && currentRegionDone) {
byte[] startKey = this.currentRegion.getStartKey();
@ -157,17 +162,26 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
regionChanged = false;
localStartKey = createClosestRowBefore(lastResult.getRow());
} else {
isFirstRegionToLocate = true;
localStartKey = this.scan.getStartRow();
}
if (!isFirstRegionToLocate
&& (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
// when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
// otherwise, maybe infinity results with RowKey=0x00 will return.
return false;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Advancing internal small scanner to startKey at '"
+ Bytes.toStringBinary(localStartKey) + "'");
}
smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
smallReversedScanCallable =
callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
if (this.scanMetrics != null && regionChanged) {
this.scanMetrics.countOfRegions.incrementAndGet();
@ -209,8 +223,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
this.currentRegion = smallReversedScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
@ -229,8 +243,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
this.lastResult = rs;
}
}
if (smallScanCallable.hasMoreResultsContext()) {
currentRegionDone = !smallScanCallable.getServerHasMoreResults();
if (smallReversedScanCallable.hasMoreResultsContext()) {
currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
} else {
currentRegionDone = countdown > 0;
}
@ -249,8 +263,74 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
closed = true;
}
/**
* A reversed ScannerCallable which supports backward small scanning.
*/
static class SmallReversedScannerCallable extends ReversedScannerCallable {
public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
int caching, int replicaId) {
super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
this.setCaching(caching);
}
@Override
protected Result[] rpcCall() throws Exception {
if (this.closed) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
ClientProtos.ScanResponse response = null;
response = getStub().scan(getRpcController(), request);
Result[] results = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
if (response.hasMoreResultsInRegion()) {
setHasMoreResultsContext(true);
setServerHasMoreResults(response.getMoreResultsInRegion());
} else {
setHasMoreResultsContext(false);
}
// We need to update result metrics since we are overriding call()
updateResultsMetrics(results);
return results;
}
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
scanMetrics, locateStartRow, rpcControllerFactory, getCaching(), id);
}
}
@VisibleForTesting
protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
this.callableFactory = callableFactory;
}
protected static class SmallReversedScannerCallableFactory {
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
boolean isFirstRegionToLocate) {
byte[] locateStartRow = null;
if (isFirstRegionToLocate
&& (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
// HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
// locate a region list, and the last one in region list is the region where our scan start.
locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
}
scan.setStartRow(localStartKey);
SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
ScannerCallableWithReplicas scannerCallableWithReplicas =
new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
retries, scannerTimeout, cacheNum, conf, caller);
return scannerCallableWithReplicas;
}
}
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -98,15 +98,15 @@ public class TestClientSmallReversedScanner {
};
}
private SmallScannerCallableFactory getFactory(
private SmallReversedScannerCallableFactory getFactory(
final ScannerCallableWithReplicas callableWithReplicas) {
return new SmallScannerCallableFactory() {
return new SmallReversedScannerCallableFactory() {
@Override
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool,
int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
boolean isFirstRegionToLocate) {
return callableWithReplicas;
}
};
@ -131,7 +131,7 @@ public class TestClientSmallReversedScanner {
// Intentionally leave a "default" caching size in the Scan. No matter the value, we
// should continue based on the server context
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@ -200,7 +200,7 @@ public class TestClientSmallReversedScanner {
// While the server returns 2 records per batch, we expect more records.
scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@ -276,7 +276,7 @@ public class TestClientSmallReversedScanner {
// While the server return 2 records per RPC, we expect there to be more records.
scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@ -312,7 +312,7 @@ public class TestClientSmallReversedScanner {
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
@Category(MediumTests.class)
public class TestSmallReversedScanner {
public static final Log LOG = LogFactory.getLog(TestSmallReversedScanner.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("testReversedSmall");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("columnFamily");
private static Table htable = null;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
// create a table with 4 region: (-oo, b),[b,c),[c,d),[d,+oo)
byte[] bytes = Bytes.toBytes("bcd");
byte[][] splitKeys = new byte[bytes.length][];
for (int i = 0; i < bytes.length; i++) {
splitKeys[i] = new byte[] { bytes[i] };
}
htable = TEST_UTIL.createTable(TABLE_NAME, COLUMN_FAMILY, splitKeys);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@After
public void tearDown() throws IOException {
TEST_UTIL.truncateTable(TABLE_NAME);
}
/**
* all rowKeys are fit in the last region.
* @throws IOException
*/
@Test
public void testSmallReversedScan01() throws IOException {
String[][] keysCases = new String[][] {
{ "d0", "d1", "d2", "d3" }, // all rowKeys fit in the last region.
{ "a0", "a1", "a2", "a3" }, // all rowKeys fit in the first region.
{ "a0", "b1", "c2", "d3" }, // each region with a rowKey
};
for (int caseIndex = 0; caseIndex < keysCases.length; caseIndex++) {
testSmallReversedScanInternal(keysCases[caseIndex]);
TEST_UTIL.truncateTable(TABLE_NAME);
}
}
private void testSmallReversedScanInternal(String[] inputRowKeys) throws IOException {
int rowCount = inputRowKeys.length;
for (int i = 0; i < rowCount; i++) {
Put put = new Put(Bytes.toBytes(inputRowKeys[i]));
put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(i));
htable.put(put);
}
Scan scan = new Scan();
scan.setReversed(true);
scan.setSmall(true);
ResultScanner scanner = htable.getScanner(scan);
Result r;
int value = rowCount;
while ((r = scanner.next()) != null) {
Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(--value));
Assert.assertArrayEquals(r.getRow(), Bytes.toBytes(inputRowKeys[value]));
}
Assert.assertEquals(value, 0);
}
/**
* Corner case:
* HBase has 4 regions, (-oo,b),[b,c),[c,d),[d,+oo), and only rowKey with byte[]={0x00} locate in region (-oo,b) .
* test whether reversed small scanner will return infinity results with RowKey={0x00}.
* @throws IOException
*/
@Test
public void testSmallReversedScan02() throws IOException {
Put put = new Put(new byte[] { (char) 0x00 });
put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(0));
htable.put(put);
Scan scan = new Scan();
scan.setCaching(1);
scan.setReversed(true);
scan.setSmall(true);
ResultScanner scanner = htable.getScanner(scan);
Result r;
int count = 1;
while ((r = scanner.next()) != null) {
Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(0));
Assert.assertArrayEquals(r.getRow(), new byte[] { (char) 0x00 });
Assert.assertTrue(--count >= 0);
}
Assert.assertEquals(count, 0);
}
}