HBASE-16886 hbase-client: scanner with reversed=true and small=true gets no result (huzheng)

This commit is contained in:
tedyu 2016-10-27 07:47:49 -07:00
parent 24a92ed63a
commit d4826e1665
3 changed files with 248 additions and 21 deletions

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -29,13 +30,18 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable; import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -49,8 +55,8 @@ import java.util.concurrent.ExecutorService;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ClientSmallReversedScanner extends ReversedClientScanner { public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
private ScannerCallableWithReplicas smallScanCallable = null; private ScannerCallableWithReplicas smallReversedScanCallable = null;
private SmallScannerCallableFactory callableFactory; private SmallReversedScannerCallableFactory callableFactory;
/** /**
* Create a new ReversibleClientScanner for the specified table. Take note that the passed * Create a new ReversibleClientScanner for the specified table. Take note that the passed
@ -80,7 +86,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException { throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout, new SmallScannerCallableFactory()); primaryOperationTimeout, new SmallReversedScannerCallableFactory());
} }
/** /**
@ -112,7 +118,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName, ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
SmallScannerCallableFactory callableFactory) throws IOException { SmallReversedScannerCallableFactory callableFactory) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout); primaryOperationTimeout);
this.callableFactory = callableFactory; this.callableFactory = callableFactory;
@ -136,6 +142,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
byte[] localStartKey; byte[] localStartKey;
int cacheNum = nbRows; int cacheNum = nbRows;
boolean regionChanged = true; boolean regionChanged = true;
boolean isFirstRegionToLocate = false;
// if we're at end of table, close and return false to stop iterating // if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null && currentRegionDone) { if (this.currentRegion != null && currentRegionDone) {
byte[] startKey = this.currentRegion.getStartKey(); byte[] startKey = this.currentRegion.getStartKey();
@ -158,6 +165,14 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
localStartKey = createClosestRowBefore(lastResult.getRow()); localStartKey = createClosestRowBefore(lastResult.getRow());
} else { } else {
localStartKey = this.scan.getStartRow(); localStartKey = this.scan.getStartRow();
isFirstRegionToLocate = true;
}
if (!isFirstRegionToLocate
&& (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
// when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
// otherwise, maybe infinity results with RowKey=0x00 will return.
return false;
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -165,9 +180,10 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
+ Bytes.toStringBinary(localStartKey) + "'"); + Bytes.toStringBinary(localStartKey) + "'");
} }
smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, smallReversedScanCallable =
getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
if (this.scanMetrics != null && regionChanged) { if (this.scanMetrics != null && regionChanged) {
this.scanMetrics.countOfRegions.incrementAndGet(); this.scanMetrics.countOfRegions.incrementAndGet();
@ -209,8 +225,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
// exhausted current region. // exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries // we do a callWithRetries
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo(); this.currentRegion = smallReversedScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) { if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
@ -229,8 +245,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
this.lastResult = rs; this.lastResult = rs;
} }
} }
if (smallScanCallable.hasMoreResultsContext()) { if (smallReversedScanCallable.hasMoreResultsContext()) {
currentRegionDone = !smallScanCallable.getServerHasMoreResults(); currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
} else { } else {
currentRegionDone = countdown > 0; currentRegionDone = countdown > 0;
} }
@ -250,7 +266,80 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
} }
@VisibleForTesting @VisibleForTesting
protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) { protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
this.callableFactory = callableFactory; this.callableFactory = callableFactory;
} }
/**
* A reversed ScannerCallable which supports backward small scanning.
*/
static class SmallReversedScannerCallable extends ReversedScannerCallable {
public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
int caching, int replicaId) {
super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
this.setCaching(caching);
}
@Override
public Result[] call(int timeout) throws IOException {
if (this.closed) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
ClientProtos.ScanResponse response = null;
controller = controllerFactory.newController();
try {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
response = getStub().scan(controller, request);
Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
if (response.hasMoreResultsInRegion()) {
setHasMoreResultsContext(true);
setServerHasMoreResults(response.getMoreResultsInRegion());
} else {
setHasMoreResultsContext(false);
}
// We need to update result metrics since we are overriding call()
updateResultsMetrics(results);
return results;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
}
}
protected static class SmallReversedScannerCallableFactory {
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
boolean isFirstRegionToLocate) {
byte[] locateStartRow = null;
if (isFirstRegionToLocate
&& (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
// HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
// locate a region list, and the last one in region list is the region where our scan start.
locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
}
scan.setStartRow(localStartKey);
SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
ScannerCallableWithReplicas scannerCallableWithReplicas =
new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
retries, scannerTimeout, cacheNum, conf, caller);
return scannerCallableWithReplicas;
}
}
} }

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory; import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -102,15 +102,15 @@ public class TestClientSmallReversedScanner {
}; };
} }
private SmallScannerCallableFactory getFactory( private SmallReversedScannerCallableFactory getFactory(
final ScannerCallableWithReplicas callableWithReplicas) { final ScannerCallableWithReplicas callableWithReplicas) {
return new SmallScannerCallableFactory() { return new SmallReversedScannerCallableFactory() {
@Override @Override
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool, RpcControllerFactory controllerFactory, ExecutorService pool,
int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
RpcRetryingCaller<Result[]> caller) { RpcRetryingCaller<Result[]> caller, boolean isFirstRegionToLocate) {
return callableWithReplicas; return callableWithReplicas;
} }
}; };
@ -135,7 +135,7 @@ public class TestClientSmallReversedScanner {
// Intentionally leave a "default" caching size in the Scan. No matter the value, we // Intentionally leave a "default" caching size in the Scan. No matter the value, we
// should continue based on the server context // should continue based on the server context
SmallScannerCallableFactory factory = getFactory(callableWithReplicas); SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@ -204,7 +204,7 @@ public class TestClientSmallReversedScanner {
// While the server returns 2 records per batch, we expect more records. // While the server returns 2 records per batch, we expect more records.
scan.setCaching(2); scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas); SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@ -280,7 +280,7 @@ public class TestClientSmallReversedScanner {
// While the server return 2 records per RPC, we expect there to be more records. // While the server return 2 records per RPC, we expect there to be more records.
scan.setCaching(2); scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas); SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@ -316,7 +316,7 @@ public class TestClientSmallReversedScanner {
ScannerCallableWithReplicas callableWithReplicas = Mockito ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class); .mock(ScannerCallableWithReplicas.class);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas); SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan, try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool, TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
@Category(MediumTests.class)
public class TestSmallReversedScanner {
public static final Log LOG = LogFactory.getLog(TestSmallReversedScanner.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("testReversedSmall");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("columnFamily");
private static Table htable = null;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(1);
// create a table with 4 region: (-oo, b),[b,c),[c,d),[d,+oo)
byte[] bytes = Bytes.toBytes("bcd");
byte[][] splitKeys = new byte[bytes.length][];
for (int i = 0; i < bytes.length; i++) {
splitKeys[i] = new byte[] { bytes[i] };
}
htable = TEST_UTIL.createTable(TABLE_NAME, COLUMN_FAMILY, splitKeys);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@After
public void tearDown() throws IOException {
TEST_UTIL.deleteTableData(TABLE_NAME);
}
/**
* all rowKeys are fit in the last region.
* @throws IOException
*/
@Test
public void testSmallReversedScan01() throws IOException {
String[][] keysCases = new String[][] {
{ "d0", "d1", "d2", "d3" }, // all rowKeys fit in the last region.
{ "a0", "a1", "a2", "a3" }, // all rowKeys fit in the first region.
{ "a0", "b1", "c2", "d3" }, // each region with a rowKey
};
for (int caseIndex = 0; caseIndex < keysCases.length; caseIndex++) {
testSmallReversedScanInternal(keysCases[caseIndex]);
TEST_UTIL.deleteTableData(TABLE_NAME);
}
}
private void testSmallReversedScanInternal(String[] inputRowKeys) throws IOException {
int rowCount = inputRowKeys.length;
for (int i = 0; i < rowCount; i++) {
Put put = new Put(Bytes.toBytes(inputRowKeys[i]));
put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(i));
htable.put(put);
}
Scan scan = new Scan();
scan.setReversed(true);
scan.setSmall(true);
ResultScanner scanner = htable.getScanner(scan);
Result r;
int value = rowCount;
while ((r = scanner.next()) != null) {
Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(--value));
Assert.assertArrayEquals(r.getRow(), Bytes.toBytes(inputRowKeys[value]));
}
Assert.assertEquals(value, 0);
}
/**
* Corner case:
* HBase has 4 regions, (-oo,b),[b,c),[c,d),[d,+oo), and only rowKey with byte[]={0x00} locate in region (-oo,b) .
* test whether reversed small scanner will return infinity results with RowKey={0x00}.
* @throws IOException
*/
@Test
public void testSmallReversedScan02() throws IOException {
Put put = new Put(new byte[] { (char) 0x00 });
put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(0));
htable.put(put);
Scan scan = new Scan();
scan.setCaching(1);
scan.setReversed(true);
scan.setSmall(true);
ResultScanner scanner = htable.getScanner(scan);
Result r;
int count = 1;
while ((r = scanner.next()) != null) {
Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(0));
Assert.assertArrayEquals(r.getRow(), new byte[] { (char) 0x00 });
Assert.assertTrue(--count >= 0);
}
Assert.assertEquals(count, 0);
}
}