HBASE-13335 Use serverHasMoreResults context in SmallScanner and SmallReversedScanner.

Use the context passed back via ScanResponse that a RegionServer
fills in to denote whether or not more results existing in the
current Region. Add a simple factory to remove a static method
used across both SmallScanner and SmallReverseScanner. Add new
unit tests for both scanner classes to test scans with and
without the new context (as a quick backward-compatibility test).
This commit is contained in:
Josh Elser 2015-03-30 16:55:47 -04:00 committed by Enis Soztutar
parent 35ddea75c4
commit ffdcc00952
4 changed files with 945 additions and 130 deletions

View File

@ -31,9 +31,13 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
* scan results, unless the results cross multiple regions or the row count of * scan results, unless the results cross multiple regions or the row count of
@ -46,33 +50,83 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
private ScannerCallableWithReplicas smallScanCallable = null; private ScannerCallableWithReplicas smallScanCallable = null;
private byte[] skipRowOfFirstResult = null; private byte[] skipRowOfFirstResult = null;
private SmallScannerCallableFactory callableFactory;
/** /**
* Create a new ReversibleClientScanner for the specified table Note that the * Create a new ReversibleClientScanner for the specified table. Take note that the passed
* passed {@link Scan}'s start row maybe changed changed. * {@link Scan} 's start row maybe changed changed.
* *
* @param conf The {@link Configuration} to use. * @param conf
* @param scan {@link Scan} to use in this scanner * The {@link Configuration} to use.
* @param tableName The table that we wish to rangeGet * @param scan
* @param connection Connection identifying the cluster * {@link Scan} to use in this scanner
* @param tableName
* The table that we wish to rangeGet
* @param connection
* Connection identifying the cluster
* @param rpcFactory * @param rpcFactory
* Factory used to create the {@link RpcRetryingCaller}
* @param controllerFactory
* Factory used to access RPC payloads
* @param pool
* Threadpool for RPC threads
* @param primaryOperationTimeout
* Call timeout
* @throws IOException * @throws IOException
* If the remote call fails
*/ */
public ClientSmallReversedScanner(final Configuration conf, final Scan scan, public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
ExecutorService pool, int primaryOperationTimeout) throws IOException { throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout); primaryOperationTimeout, new SmallScannerCallableFactory());
} }
/** /**
* Gets a scanner for following scan. Move to next region or continue from the * Create a new ReversibleClientScanner for the specified table. Take note that the passed
* last result or start from the start row. * {@link Scan}'s start row may be changed.
*
* @param conf
* The {@link Configuration} to use.
* @param scan
* {@link Scan} to use in this scanner
* @param tableName
* The table that we wish to rangeGet
* @param connection
* Connection identifying the cluster
* @param rpcFactory
* Factory used to create the {@link RpcRetryingCaller}
* @param controllerFactory
* Factory used to access RPC payloads
* @param pool
* Threadpool for RPC threads
* @param primaryOperationTimeout
* Call timeout
* @param callableFactory
* Factory used to create the {@link SmallScannerCallable}
* @throws IOException
* If the remote call fails
*/
@VisibleForTesting
ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
SmallScannerCallableFactory callableFactory) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout);
this.callableFactory = callableFactory;
}
/**
* Gets a scanner for following scan. Move to next region or continue from the last result or
* start from the start row.
* *
* @param nbRows * @param nbRows
* @param done true if Server-side says we're done scanning. * @param done
* @param currentRegionDone true if scan is over on current region * true if Server-side says we're done scanning.
* @param currentRegionDone
* true if scan is over on current region
* @return true if has next scanner * @return true if has next scanner
* @throws IOException * @throws IOException
*/ */
@ -112,10 +166,9 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
+ Bytes.toStringBinary(localStartKey) + "'"); + Bytes.toStringBinary(localStartKey) + "'");
} }
smallScanCallable = ClientSmallScanner.getSmallScanCallable( smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
getRetries(), getScannerTimeout(), getConf(), caller);
if (this.scanMetrics != null && skipRowOfFirstResult == null) { if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet(); this.scanMetrics.countOfRegions.incrementAndGet();
@ -131,46 +184,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
return null; return null;
} }
if (cache.size() == 0) { if (cache.size() == 0) {
Result[] values = null; loadCache();
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
boolean currentRegionDone = false;
// Values == null means server-side filter has determined we must STOP
while (remainingResultSize > 0 && countdown > 0
&& nextScanner(countdown, values == null, currentRegionDone)) {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
// TODO use context from server
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
- lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
currentRegionDone = countdown > 0;
}
} }
if (cache.size() > 0) { if (cache.size() > 0) {
@ -182,6 +196,52 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
return null; return null;
} }
@Override
protected void loadCache() throws IOException {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
boolean currentRegionDone = false;
// Values == null means server-side filter has determined we must STOP
while (remainingResultSize > 0 && countdown > 0
&& nextScanner(countdown, values == null, currentRegionDone)) {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
- lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
if (smallScanCallable.hasMoreResultsContext()) {
currentRegionDone = !smallScanCallable.getServerHasMoreResults();
} else {
currentRegionDone = countdown > 0;
}
}
}
@Override @Override
protected void initializeScannerInConstruction() throws IOException { protected void initializeScannerInConstruction() throws IOException {
@ -195,4 +255,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
closed = true; closed = true;
} }
@VisibleForTesting
protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
this.callableFactory = callableFactory;
}
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
/** /**
@ -55,26 +56,72 @@ public class ClientSmallScanner extends ClientScanner {
// When fetching results from server, skip the first result if it has the same // When fetching results from server, skip the first result if it has the same
// row with this one // row with this one
private byte[] skipRowOfFirstResult = null; private byte[] skipRowOfFirstResult = null;
private SmallScannerCallableFactory callableFactory;
/** /**
* Create a new ShortClientScanner for the specified table Note that the * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
* passed {@link Scan}'s start row maybe changed changed. * 's start row maybe changed changed.
* *
* @param conf The {@link Configuration} to use. * @param conf
* @param scan {@link Scan} to use in this scanner * The {@link Configuration} to use.
* @param tableName The table that we wish to rangeGet * @param scan
* @param connection Connection identifying the cluster * {@link Scan} to use in this scanner
* @param tableName
* The table that we wish to rangeGet
* @param connection
* Connection identifying the cluster
* @param rpcFactory * @param rpcFactory
* Factory used to create the {@link RpcRetryingCaller}
* @param controllerFactory
* Factory used to access RPC payloads
* @param pool * @param pool
* Threadpool for RPC threads
* @param primaryOperationTimeout * @param primaryOperationTimeout
* Call timeout
* @throws IOException
* If the remote call fails
*/
public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
throws IOException {
this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout, new SmallScannerCallableFactory());
}
/**
* Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan}
* 's start row maybe changed changed. Intended for unit tests to provide their own
* {@link SmallScannerCallableFactory} implementation/mock.
*
* @param conf
* The {@link Configuration} to use.
* @param scan
* {@link Scan} to use in this scanner
* @param tableName
* The table that we wish to rangeGet
* @param connection
* Connection identifying the cluster
* @param rpcFactory
* Factory used to create the {@link RpcRetryingCaller}
* @param controllerFactory
* Factory used to access RPC payloads
* @param pool
* Threadpool for RPC threads
* @param primaryOperationTimeout
* Call timeout
* @param callableFactory
* Factory used to create the {@link SmallScannerCallable}
* @throws IOException * @throws IOException
*/ */
public ClientSmallScanner(final Configuration conf, final Scan scan, @VisibleForTesting
final TableName tableName, ClusterConnection connection, ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
ExecutorService pool, int primaryOperationTimeout) throws IOException { RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
SmallScannerCallableFactory callableFactory) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout); primaryOperationTimeout);
this.callableFactory = callableFactory;
} }
@Override @Override
@ -125,32 +172,15 @@ public class ClientSmallScanner extends ClientScanner {
LOG.trace("Advancing internal small scanner to startKey at '" LOG.trace("Advancing internal small scanner to startKey at '"
+ Bytes.toStringBinary(localStartKey) + "'"); + Bytes.toStringBinary(localStartKey) + "'");
} }
smallScanCallable = getSmallScanCallable( smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
getConnection(), getTable(), scan, getScanMetrics(), localStartKey, cacheNum, getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
getRetries(), getScannerTimeout(), getConf(), caller);
if (this.scanMetrics != null && skipRowOfFirstResult == null) { if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet(); this.scanMetrics.countOfRegions.incrementAndGet();
} }
return true; return true;
} }
static ScannerCallableWithReplicas getSmallScanCallable(ClusterConnection connection,
TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey,
final int cacheNum, RpcControllerFactory controllerFactory, ExecutorService pool,
int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
scan.setStartRow(localStartKey);
SmallScannerCallable s = new SmallScannerCallable(
connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
ScannerCallableWithReplicas scannerCallableWithReplicas =
new ScannerCallableWithReplicas(table, connection,
s, pool, primaryOperationTimeout, scan, retries,
scannerTimeout, cacheNum, conf, caller);
return scannerCallableWithReplicas;
}
static class SmallScannerCallable extends ScannerCallable { static class SmallScannerCallable extends ScannerCallable {
public SmallScannerCallable( public SmallScannerCallable(
ClusterConnection connection, TableName table, Scan scan, ClusterConnection connection, TableName table, Scan scan,
@ -202,46 +232,7 @@ public class ClientSmallScanner extends ClientScanner {
return null; return null;
} }
if (cache.size() == 0) { if (cache.size() == 0) {
Result[] values = null; loadCache();
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
boolean currentRegionDone = false;
// Values == null means server-side filter has determined we must STOP
while (remainingResultSize > 0 && countdown > 0
&& nextScanner(countdown, values == null, currentRegionDone)) {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
// TODO Use the server's response about more results
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
- lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
currentRegionDone = countdown > 0;
}
} }
if (cache.size() > 0) { if (cache.size() > 0) {
@ -254,8 +245,80 @@ public class ClientSmallScanner extends ClientScanner {
} }
@Override @Override
protected void loadCache() throws IOException {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
boolean currentRegionDone = false;
// Values == null means server-side filter has determined we must STOP
while (remainingResultSize > 0 && countdown > 0
&& nextScanner(countdown, values == null, currentRegionDone)) {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
- lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
if (smallScanCallable.hasMoreResultsContext()) {
// If the server has more results, the current region is not done
currentRegionDone = !smallScanCallable.getServerHasMoreResults();
} else {
// not guaranteed to get the context in older versions, fall back to checking countdown
currentRegionDone = countdown > 0;
}
}
}
public void close() { public void close() {
if (!scanMetricsPublished) writeScanMetrics(); if (!scanMetricsPublished) writeScanMetrics();
closed = true; closed = true;
} }
@VisibleForTesting
protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
this.callableFactory = callableFactory;
}
@InterfaceAudience.Private
protected static class SmallScannerCallableFactory {
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) {
scan.setStartRow(localStartKey);
SmallScannerCallable s = new SmallScannerCallable(
connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
ScannerCallableWithReplicas scannerCallableWithReplicas =
new ScannerCallableWithReplicas(table, connection,
s, pool, primaryOperationTimeout, scan, retries,
scannerTimeout, cacheNum, conf, caller);
return scannerCallableWithReplicas;
}
}
} }

View File

@ -0,0 +1,349 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Test the ClientSmallReversedScanner.
*/
@Category(SmallTests.class)
public class TestClientSmallReversedScanner {
Scan scan;
ExecutorService pool;
Configuration conf;
ClusterConnection clusterConn;
RpcRetryingCallerFactory rpcFactory;
RpcControllerFactory controllerFactory;
RpcRetryingCaller<Result[]> caller;
@Before
@SuppressWarnings({"deprecation", "unchecked"})
public void setup() throws IOException {
clusterConn = Mockito.mock(ClusterConnection.class);
rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
controllerFactory = Mockito.mock(RpcControllerFactory.class);
pool = Executors.newSingleThreadExecutor();
scan = new Scan();
conf = new Configuration();
Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
// Mock out the RpcCaller
caller = Mockito.mock(RpcRetryingCaller.class);
// Return the mock from the factory
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
}
@After
public void teardown() {
if (null != pool) {
pool.shutdownNow();
}
}
/**
* Create a simple Answer which returns true the first time, and false every time after.
*/
private Answer<Boolean> createTrueThenFalseAnswer() {
return new Answer<Boolean>() {
boolean first = true;
@Override
public Boolean answer(InvocationOnMock invocation) {
if (first) {
first = false;
return true;
}
return false;
}
};
}
private SmallScannerCallableFactory getFactory(
final ScannerCallableWithReplicas callableWithReplicas) {
return new SmallScannerCallableFactory() {
@Override
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool,
int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
return callableWithReplicas;
}
};
}
@Test
public void testContextPresent() throws Exception {
final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
// Mock out the RpcCaller
@SuppressWarnings("unchecked")
RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
// Return the mock from the factory
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
// Intentionally leave a "default" caching size in the Scan. No matter the value, we
// should continue based on the server context
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
Integer.MAX_VALUE)) {
csrs.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
.thenAnswer(new Answer<Result[]>() {
int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) {
Result[] results;
if (0 == count) {
results = new Result[] {Result.create(new Cell[] {kv3}),
Result.create(new Cell[] {kv2})};
} else if (1 == count) {
results = new Result[] {Result.create(new Cell[] {kv1})};
} else {
results = new Result[0];
}
count++;
return results;
}
});
// Pass back the context always
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
// Only have more results the first time
Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
createTrueThenFalseAnswer());
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
csrs.loadCache();
List<Result> results = csrs.cache;
Iterator<Result> iter = results.iterator();
assertEquals(3, results.size());
for (int i = 3; i >= 1 && iter.hasNext(); i--) {
Result result = iter.next();
byte[] row = result.getRow();
assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
assertEquals(1, result.getMap().size());
}
assertTrue(csrs.closed);
}
}
@Test
public void testNoContextFewerRecords() throws Exception {
final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
// While the server returns 2 records per batch, we expect more records.
scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
Integer.MAX_VALUE)) {
csrs.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
.thenAnswer(new Answer<Result[]>() {
int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) {
Result[] results;
if (0 == count) {
results = new Result[] {Result.create(new Cell[] {kv3}),
Result.create(new Cell[] {kv2})};
} else if (1 == count) {
// Return fewer records than expected (2)
results = new Result[] {Result.create(new Cell[] {kv1})};
} else {
throw new RuntimeException("Should not fetch a third batch from the server");
}
count++;
return results;
}
});
// Server doesn't return the context
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
// getServerHasMoreResults shouldn't be called when hasMoreResultsContext returns false
Mockito.when(callableWithReplicas.getServerHasMoreResults())
.thenThrow(new RuntimeException("Should not be called"));
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
csrs.loadCache();
List<Result> results = csrs.cache;
Iterator<Result> iter = results.iterator();
assertEquals(2, results.size());
for (int i = 3; i >= 2 && iter.hasNext(); i--) {
Result result = iter.next();
byte[] row = result.getRow();
assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
assertEquals(1, result.getMap().size());
}
// "consume" the Results
results.clear();
csrs.loadCache();
assertEquals(1, results.size());
Result result = results.get(0);
assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8));
assertEquals(1, result.getMap().size());
assertTrue(csrs.closed);
}
}
@Test
public void testNoContextNoRecords() throws Exception {
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
// While the server return 2 records per RPC, we expect there to be more records.
scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
Integer.MAX_VALUE)) {
csrs.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
.thenReturn(new Result[0]);
// Server doesn't return the context
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
// Only have more results the first time
Mockito.when(callableWithReplicas.getServerHasMoreResults())
.thenThrow(new RuntimeException("Should not be called"));
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
csrs.loadCache();
assertEquals(0, csrs.cache.size());
assertTrue(csrs.closed);
}
}
@Test
public void testContextNoRecords() throws Exception {
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
Integer.MAX_VALUE)) {
csrs.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
.thenReturn(new Result[0]);
// Server doesn't return the context
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
// Only have more results the first time
Mockito.when(callableWithReplicas.getServerHasMoreResults())
.thenReturn(false);
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
csrs.loadCache();
assertEquals(0, csrs.cache.size());
assertTrue(csrs.closed);
}
}
}

View File

@ -0,0 +1,339 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Test the ClientSmallScanner.
*/
@Category(SmallTests.class)
public class TestClientSmallScanner {
Scan scan;
ExecutorService pool;
Configuration conf;
ClusterConnection clusterConn;
RpcRetryingCallerFactory rpcFactory;
RpcControllerFactory controllerFactory;
RpcRetryingCaller<Result[]> caller;
@Before
@SuppressWarnings({"deprecation", "unchecked"})
public void setup() throws IOException {
clusterConn = Mockito.mock(ClusterConnection.class);
rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
controllerFactory = Mockito.mock(RpcControllerFactory.class);
pool = Executors.newSingleThreadExecutor();
scan = new Scan();
conf = new Configuration();
Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
// Mock out the RpcCaller
caller = Mockito.mock(RpcRetryingCaller.class);
// Return the mock from the factory
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
}
@After
public void teardown() {
if (null != pool) {
pool.shutdownNow();
}
}
/**
* Create a simple Answer which returns true the first time, and false every time after.
*/
private Answer<Boolean> createTrueThenFalseAnswer() {
return new Answer<Boolean>() {
boolean first = true;
@Override
public Boolean answer(InvocationOnMock invocation) {
if (first) {
first = false;
return true;
}
return false;
}
};
}
private SmallScannerCallableFactory getFactory(
final ScannerCallableWithReplicas callableWithReplicas) {
return new SmallScannerCallableFactory() {
@Override
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
RpcControllerFactory controllerFactory, ExecutorService pool,
int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
RpcRetryingCaller<Result[]> caller) {
return callableWithReplicas;
}
};
}
@Test
public void testContextPresent() throws Exception {
final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
// Mock out the RpcCaller
@SuppressWarnings("unchecked")
RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
// Return the mock from the factory
Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
// Intentionally leave a "default" caching size in the Scan. No matter the value, we
// should continue based on the server context
try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
css.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
.thenAnswer(new Answer<Result[]>() {
int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) {
Result[] results;
if (0 == count) {
results = new Result[] {Result.create(new Cell[] {kv1}),
Result.create(new Cell[] {kv2})};
} else if (1 == count) {
results = new Result[] {Result.create(new Cell[] {kv3})};
} else {
results = new Result[0];
}
count++;
return results;
}
});
// Pass back the context always
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
// Only have more results the first time
Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
createTrueThenFalseAnswer());
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
css.loadCache();
List<Result> results = css.cache;
assertEquals(3, results.size());
for (int i = 1; i <= 3; i++) {
Result result = results.get(i - 1);
byte[] row = result.getRow();
assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
assertEquals(1, result.getMap().size());
}
assertTrue(css.closed);
}
}
@Test
public void testNoContextFewerRecords() throws Exception {
final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
Type.Maximum);
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
// While the server returns 2 records per batch, we expect more records.
scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
css.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
.thenAnswer(new Answer<Result[]>() {
int count = 0;
@Override
public Result[] answer(InvocationOnMock invocation) {
Result[] results;
if (0 == count) {
results = new Result[] {Result.create(new Cell[] {kv1}),
Result.create(new Cell[] {kv2})};
} else if (1 == count) {
// Return fewer records than expected (2)
results = new Result[] {Result.create(new Cell[] {kv3})};
} else {
throw new RuntimeException("Should not fetch a third batch from the server");
}
count++;
return results;
}
});
// Server doesn't return the context
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
// Only have more results the first time
Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
new RuntimeException("Should not be called"));
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
css.loadCache();
List<Result> results = css.cache;
assertEquals(2, results.size());
for (int i = 1; i <= 2; i++) {
Result result = results.get(i - 1);
byte[] row = result.getRow();
assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
assertEquals(1, result.getMap().size());
}
// "consume" the results we verified
results.clear();
css.loadCache();
assertEquals(1, results.size());
Result result = results.get(0);
assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8));
assertEquals(1, result.getMap().size());
assertTrue(css.closed);
}
}
@Test
public void testNoContextNoRecords() throws Exception {
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
// While the server return 2 records per RPC, we expect there to be more records.
scan.setCaching(2);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
css.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
.thenReturn(new Result[0]);
// Server doesn't return the context
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
// Only have more results the first time
Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
new RuntimeException("Should not be called"));
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
css.loadCache();
assertEquals(0, css.cache.size());
assertTrue(css.closed);
}
}
@Test
public void testContextNoRecords() throws Exception {
ScannerCallableWithReplicas callableWithReplicas = Mockito
.mock(ScannerCallableWithReplicas.class);
SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
css.setScannerCallableFactory(factory);
// Return some data the first time, less the second, and none after that
Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
.thenReturn(new Result[0]);
// Server doesn't return the context
Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
// Only have more results the first time
Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false);
// A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
// Trigger the "no more data" branch for #nextScanner(...)
Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
css.loadCache();
assertEquals(0, css.cache.size());
assertTrue(css.closed);
}
}
}