HBASE-16932 Implement small scan

This commit is contained in:
zhangduo 2016-10-26 17:21:35 +08:00
parent 5cee6a39c2
commit cd3dd6e018
14 changed files with 781 additions and 126 deletions

View File

@ -20,11 +20,18 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
@ -34,6 +41,7 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
@ -59,6 +67,13 @@ class AsyncConnectionConfiguration {
/** How many retries are allowed before we start to log */
private final int startLogErrorsCnt;
private final long scanTimeoutNs;
private final int scannerCaching;
private final long scannerMaxResultSize;
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@ -68,11 +83,18 @@ class AsyncConnectionConfiguration {
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
this.pauseNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.pauseNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY,
DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.scanTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
this.scannerCaching =
conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
}
long getMetaOperationTimeoutNs() {
@ -103,4 +125,15 @@ class AsyncConnectionConfiguration {
return startLogErrorsCnt;
}
public long getScanTimeoutNs() {
return scanTimeoutNs;
}
public int getScannerCaching() {
return scannerCaching;
}
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
/**
* TODO: reimplement using aync connection when the scan logic is ready. The current implementation
@ -52,6 +55,26 @@ class AsyncRegionLocator implements Closeable {
return future;
}
CompletableFuture<HRegionLocation> getPreviousRegionLocation(TableName tableName,
byte[] startRowOfCurrentRegion, boolean reload) {
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion);
try {
for (;;) {
HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload);
byte[] endKey = loc.getRegionInfo().getEndKey();
if (Bytes.equals(startRowOfCurrentRegion, endKey)) {
future.complete(loc);
break;
}
toLocateRow = endKey;
}
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}
void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception,
ServerName source) {
conn.updateCachedLocations(tableName, regionName, row, exception, source);

View File

@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.common.base.Preconditions;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.util.HashedWheelTimer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -54,6 +56,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs = -1L;
private boolean locateToPreviousRegion;
public SingleRequestCallerBuilder<T> table(TableName tableName) {
this.tableName = tableName;
return this;
@ -64,8 +68,8 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public SingleRequestCallerBuilder<T> action(
AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
public SingleRequestCallerBuilder<T>
action(AsyncSingleRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@ -80,11 +84,18 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public SingleRequestCallerBuilder<T> locateToPreviousRegion(boolean locateToPreviousRegion) {
this.locateToPreviousRegion = locateToPreviousRegion;
return this;
}
public AsyncSingleRequestRpcRetryingCaller<T> build() {
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
Preconditions.checkNotNull(tableName, "tableName is null"),
Preconditions.checkNotNull(row, "row is null"),
Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
locateToPreviousRegion
? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re)
: (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re),
checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(),
conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs,
conn.connConf.getStartLogErrorsCnt());
}
@ -103,4 +114,64 @@ class AsyncRpcRetryingCallerFactory {
public <T> SingleRequestCallerBuilder<T> single() {
return new SingleRequestCallerBuilder<>();
}
public class SmallScanCallerBuilder {
private TableName tableName;
private Scan scan;
private int limit;
private long scanTimeoutNs = -1L;
private long rpcTimeoutNs = -1L;
public SmallScanCallerBuilder table(TableName tableName) {
this.tableName = tableName;
return this;
}
public SmallScanCallerBuilder setScan(Scan scan) {
this.scan = scan;
return this;
}
public SmallScanCallerBuilder limit(int limit) {
this.limit = limit;
return this;
}
public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
this.scanTimeoutNs = unit.toNanos(scanTimeout);
return this;
}
public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
return this;
}
public AsyncSmallScanRpcRetryingCaller build() {
TableName tableName = checkNotNull(this.tableName, "tableName is null");
Scan scan = checkNotNull(this.scan, "scan is null");
checkArgument(limit > 0, "invalid limit %d", limit);
return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs,
rpcTimeoutNs);
}
/**
* Shortcut for {@code build().call()}
*/
public CompletableFuture<List<Result>> call() {
return build().call();
}
}
/**
* Create retry caller for small scan.
*/
public SmallScanCallerBuilder smallScan() {
return new SmallScanCallerBuilder();
}
}

View File

@ -60,6 +60,12 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
ClientService.Interface stub);
}
@FunctionalInterface
public interface RegionLocator {
CompletableFuture<HRegionLocation> locate(AsyncConnectionImpl conn, TableName tableName,
byte[] row, boolean reload);
}
private final HashedWheelTimer retryTimer;
private final AsyncConnectionImpl conn;
@ -68,6 +74,8 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final byte[] row;
private final RegionLocator locator;
private final Callable<T> callable;
private final long pauseNs;
@ -89,12 +97,13 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
private final long startNs;
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, Callable<T> callable, long pauseNs, int maxRetries,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
TableName tableName, byte[] row, RegionLocator locator, Callable<T> callable, long pauseNs,
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.row = row;
this.locator = locator;
this.callable = callable;
this.pauseNs = pauseNs;
this.maxAttempts = retries2Attempts(maxRetries);
@ -207,7 +216,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
}
private void locateThenCall() {
conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> {
locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = "

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Retry caller for smaller scan.
*/
@InterfaceAudience.Private
class AsyncSmallScanRpcRetryingCaller {
private final AsyncConnectionImpl conn;
private final TableName tableName;
private final Scan scan;
private final int limit;
private final long scanTimeoutNs;
private final long rpcTimeoutNs;
private final Function<byte[], byte[]> createClosestNextRow;
private final Runnable firstScan;
private final Function<HRegionInfo, Boolean> nextScan;
private final List<Result> resultList;
private final CompletableFuture<List<Result>> future;
public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
int limit, long scanTimeoutNs, long rpcTimeoutNs) {
this.conn = conn;
this.tableName = tableName;
this.scan = scan;
this.limit = limit;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
if (scan.isReversed()) {
this.createClosestNextRow = ConnectionUtils::createClosestRowBefore;
this.firstScan = this::reversedFirstScan;
this.nextScan = this::reversedNextScan;
} else {
this.createClosestNextRow = ConnectionUtils::createClosestRowAfter;
this.firstScan = this::firstScan;
this.nextScan = this::nextScan;
}
this.resultList = new ArrayList<>();
this.future = new CompletableFuture<>();
}
private static final class SmallScanResponse {
public final Result[] results;
public final HRegionInfo currentRegion;
public final boolean hasMoreResultsInRegion;
public SmallScanResponse(Result[] results, HRegionInfo currentRegion,
boolean hasMoreResultsInRegion) {
this.results = results;
this.currentRegion = currentRegion;
this.hasMoreResultsInRegion = hasMoreResultsInRegion;
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
justification = "Findbugs seems to be confused by lambda expression.")
private CompletableFuture<SmallScanResponse> scan(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
CompletableFuture<SmallScanResponse> future = new CompletableFuture<>();
ScanRequest req;
try {
req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan,
limit - resultList.size(), true);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
stub.scan(controller, req, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp);
future.complete(
new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion()));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
});
return future;
}
private void onComplete(SmallScanResponse resp) {
resultList.addAll(Arrays.asList(resp.results));
if (resultList.size() == limit) {
future.complete(resultList);
return;
}
if (resp.hasMoreResultsInRegion) {
if (resp.results.length > 0) {
scan.setStartRow(
createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow()));
}
scan(false);
return;
}
if (!nextScan.apply(resp.currentRegion)) {
future.complete(resultList);
}
}
private void scan(boolean locateToPreviousRegion) {
conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
.locateToPreviousRegion(locateToPreviousRegion).action(this::scan).call()
.whenComplete((resp, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
onComplete(resp);
}
});
}
public CompletableFuture<List<Result>> call() {
firstScan.run();
return future;
}
private void firstScan() {
scan(false);
}
private void reversedFirstScan() {
scan(isEmptyStartRow(scan.getStartRow()));
}
private boolean nextScan(HRegionInfo region) {
if (isEmptyStopRow(scan.getStopRow())) {
if (isEmptyStopRow(region.getEndKey())) {
return false;
}
} else {
if (Bytes.compareTo(region.getEndKey(), scan.getStopRow()) >= 0) {
return false;
}
}
scan.setStartRow(region.getEndKey());
scan(false);
return true;
}
private boolean reversedNextScan(HRegionInfo region) {
if (isEmptyStopRow(scan.getStopRow())) {
if (isEmptyStartRow(region.getStartKey())) {
return false;
}
} else {
if (Bytes.compareTo(region.getStartKey(), scan.getStopRow()) <= 0) {
return false;
}
}
scan.setStartRow(region.getStartKey());
scan(true);
return true;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import com.google.common.base.Preconditions;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -312,4 +313,26 @@ public interface AsyncTable {
*/
CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations mutation);
/**
* Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}.
* @see #smallScan(Scan, int)
*/
default CompletableFuture<List<Result>> smallScan(Scan scan) {
return smallScan(scan, Integer.MAX_VALUE);
}
/**
* Return all the results that match the given scan object. The number of the returned results
* will not be greater than {@code limit}.
* <p>
* Notice that the scan must be small, and should not use batch or allowPartialResults. The
* {@code caching} property of the scan object is also ignored as we will use {@code limit}
* instead.
* @param scan A configured {@link Scan} object.
* @param limit the limit of results count
* @return The results of this small scan operation. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* The implementation of AsyncTable.
@ -57,12 +59,18 @@ class AsyncTableImpl implements AsyncTable {
private final TableName tableName;
private final int defaultScannerCaching;
private final long defaultScannerMaxResultSize;
private long readRpcTimeoutNs;
private long writeRpcTimeoutNs;
private long operationTimeoutNs;
private long scanTimeoutNs;
public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
this.conn = conn;
this.tableName = tableName;
@ -70,6 +78,9 @@ class AsyncTableImpl implements AsyncTable {
this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
: conn.connConf.getOperationTimeoutNs();
this.defaultScannerCaching = conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
}
@Override
@ -256,8 +267,8 @@ class AsyncTableImpl implements AsyncTable {
future.completeExceptionally(controller.getFailed());
} else {
try {
org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter
.getResults(req, resp, controller.cellScanner());
org.apache.hadoop.hbase.client.MultiResponse multiResp =
ResponseConverter.getResults(req, resp, controller.cellScanner());
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
future
@ -305,6 +316,38 @@ class AsyncTableImpl implements AsyncTable {
.call();
}
private <T> CompletableFuture<T> failedFuture(Throwable error) {
CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(error);
return future;
}
private Scan setDefaultScanConfig(Scan scan) {
// always create a new scan object as we may reset the start row later.
Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
if (newScan.getCaching() <= 0) {
newScan.setCaching(defaultScannerCaching);
}
if (newScan.getMaxResultSize() <= 0) {
newScan.setMaxResultSize(defaultScannerMaxResultSize);
}
return newScan;
}
@Override
public CompletableFuture<List<Result>> smallScan(Scan scan, int limit) {
if (!scan.isSmall()) {
return failedFuture(new IllegalArgumentException("Only small scan is allowed"));
}
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
return failedFuture(
new IllegalArgumentException("Batch and allowPartial is not allowed for small scan"));
}
return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
.limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
}
@Override
public void setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);

View File

@ -17,7 +17,20 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -35,20 +48,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
@ -57,9 +61,7 @@ import java.util.concurrent.ExecutorService;
@InterfaceAudience.Private
public abstract class ClientScanner extends AbstractClientScanner {
private static final Log LOG = LogFactory.getLog(ClientScanner.class);
// A byte array in which all elements are the max byte, and it is used to
// construct closest front row
static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
protected Scan scan;
protected boolean closed = false;
// Current region scanner is against. Gets cleared if current region goes
@ -75,8 +77,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final LinkedList<Result> partialResults = new LinkedList<Result>();
/**
* The row for which we are accumulating partial Results (i.e. the row of the Results stored
* inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
* via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
* inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
* the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
*/
protected byte[] partialResultsRow = null;
/**
@ -92,14 +94,14 @@ public abstract class ClientScanner extends AbstractClientScanner {
private final TableName tableName;
protected final int scannerTimeout;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result []> caller;
protected RpcRetryingCaller<Result[]> caller;
protected RpcControllerFactory rpcControllerFactory;
protected Configuration conf;
//The timeout on the primary. Applicable if there are multiple replicas for a region
//In that case, we will only wait for this much timeout on the primary before going
//to the replicas and trying the same scan. Note that the retries will still happen
//on each replica and the first successful results will be taken. A timeout of 0 is
//disallowed.
// The timeout on the primary. Applicable if there are multiple replicas for a region
// In that case, we will only wait for this much timeout on the primary before going
// to the replicas and trying the same scan. Note that the retries will still happen
// on each replica and the first successful results will be taken. A timeout of 0 is
// disallowed.
protected final int primaryOperationTimeout;
private int retries;
protected final ExecutorService pool;
@ -447,7 +449,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
}
} else {
// we need rescan this row because we only loaded partial row before
@ -760,28 +762,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
closed = true;
}
/**
* Create the closest row before the specified row
* @param row
* @return a new byte array which is the closest front row of the specified one
*/
protected static byte[] createClosestRowBefore(byte[] row) {
if (row == null) {
throw new IllegalArgumentException("The passed row is empty");
}
if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
return MAX_BYTE_ARRAY;
}
if (row[row.length - 1] == 0) {
return Arrays.copyOf(row, row.length - 1);
} else {
byte[] closestFrontRow = Arrays.copyOf(row, row.length);
closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
return closestFrontRow;
}
}
@Override
public boolean renewLease() {
if (callable != null) {

View File

@ -16,9 +16,11 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
@ -36,8 +38,6 @@ import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFac
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
/**
* <p>
* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the

View File

@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@ -33,10 +37,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Utility used by client connections.
@ -222,4 +227,41 @@ public final class ConnectionUtils {
return HConstants.NO_NONCE;
}
};
// A byte array in which all elements are the max byte, and it is used to
// construct closest front row
static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
/**
* Create the closest row after the specified row
*/
static byte[] createClosestRowAfter(byte[] row) {
return Arrays.copyOf(row, row.length + 1);
}
/**
* Create the closest row before the specified row
*/
static byte[] createClosestRowBefore(byte[] row) {
if (row.length == 0) {
return MAX_BYTE_ARRAY;
}
if (row[row.length - 1] == 0) {
return Arrays.copyOf(row, row.length - 1);
} else {
byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length];
System.arraycopy(row, 0, nextRow, 0, row.length - 1);
nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1);
System.arraycopy(nextRow, row.length, MAX_BYTE_ARRAY, 0, MAX_BYTE_ARRAY.length);
return nextRow;
}
}
static boolean isEmptyStartRow(byte[] row) {
return Bytes.equals(row, EMPTY_START_ROW);
}
static boolean isEmptyStopRow(byte[] row) {
return Bytes.equals(row, EMPTY_END_ROW);
}
}

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -30,7 +33,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@ -40,12 +42,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting;
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
@ -341,7 +339,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
if (callable.getScan().isReversed()) {
callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
} else {
callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow()));
}
}
}

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableSmallScan {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static int COUNT = 1000;
private static AsyncConnection ASYNC_CONN;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(3);
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
List<CompletableFuture<?>> futures = new ArrayList<>();
IntStream.range(0, COUNT)
.forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i)))
.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))));
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
}
@AfterClass
public static void tearDown() throws Exception {
ASYNC_CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testScanAll() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
List<Result> results = table.smallScan(new Scan().setSmall(true)).get();
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> {
Result result = results.get(i);
assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
@Test
public void testReversedScanAll() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
List<Result> results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get();
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> {
Result result = results.get(i);
int actualIndex = COUNT - i - 1;
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
@Test
public void testScanNoStopKey() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
int start = 345;
List<Result> results = table
.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get();
assertEquals(COUNT - start, results.size());
IntStream.range(0, COUNT - start).forEach(i -> {
Result result = results.get(i);
int actualIndex = start + i;
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
@Test
public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
int start = 765;
List<Result> results = table
.smallScan(
new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true))
.get();
assertEquals(start + 1, results.size());
IntStream.range(0, start + 1).forEach(i -> {
Result result = results.get(i);
int actualIndex = start - i;
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
private void testScan(int start, int stop) throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get();
assertEquals(stop - start, results.size());
IntStream.range(0, stop - start).forEach(i -> {
Result result = results.get(i);
int actualIndex = start + i;
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
private void testReversedScan(int start, int stop)
throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true))
.get();
assertEquals(start - stop, results.size());
IntStream.range(0, start - stop).forEach(i -> {
Result result = results.get(i);
int actualIndex = start - i;
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
@Test
public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException {
testScan(345, 567);
}
@Test
public void testReversedScanWithStartKeyAndStopKey()
throws InterruptedException, ExecutionException {
testReversedScan(765, 543);
}
@Test
public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException {
testScan(222, 333);
}
@Test
public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException {
testScan(222, 333);
}
@Test
public void testScanWithLimit() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
int start = 111;
int stop = 888;
int limit = 300;
List<Result> results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start)))
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true),
limit).get();
assertEquals(limit, results.size());
IntStream.range(0, limit).forEach(i -> {
Result result = results.get(i);
int actualIndex = start + i;
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
@Test
public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME);
int start = 888;
int stop = 111;
int limit = 300;
List<Result> results = table.smallScan(
new Scan(Bytes.toBytes(String.format("%03d", start)))
.setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true),
limit).get();
assertEquals(limit, results.size());
IntStream.range(0, limit).forEach(i -> {
Result result = results.get(i);
int actualIndex = start - i;
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
});
}
}

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -5933,7 +5934,7 @@ public class TestFromClientSide {
public void testReversedScanUnderMultiRegions() throws Exception {
// Test Initialization.
TableName TABLE = TableName.valueOf("testReversedScanUnderMultiRegions");
byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY;
byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY;
byte[][] splitRows = new byte[][] { Bytes.toBytes("005"),
Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)),
Bytes.toBytes("006"),