From cd3dd6e018513357d2cf0b5bba073f5a6551f7a4 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 26 Oct 2016 17:21:35 +0800 Subject: [PATCH] HBASE-16932 Implement small scan --- .../client/AsyncConnectionConfiguration.java | 41 +++- .../hbase/client/AsyncRegionLocator.java | 23 ++ .../client/AsyncRpcRetryingCallerFactory.java | 83 ++++++- .../AsyncSingleRequestRpcRetryingCaller.java | 15 +- .../AsyncSmallScanRpcRetryingCaller.java | 211 +++++++++++++++++ .../hadoop/hbase/client/AsyncTable.java | 23 ++ .../hadoop/hbase/client/AsyncTableImpl.java | 47 +++- .../hadoop/hbase/client/ClientScanner.java | 176 +++++++------- .../client/ClientSmallReversedScanner.java | 6 +- .../hadoop/hbase/client/ConnectionUtils.java | 46 +++- .../hbase/client/ReversedClientScanner.java | 2 + .../client/ScannerCallableWithReplicas.java | 12 +- .../hbase/client/TestAsyncTableSmallScan.java | 219 ++++++++++++++++++ .../hbase/client/TestFromClientSide.java | 3 +- 14 files changed, 781 insertions(+), 126 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index ba2e660f3dc..aaac8456b3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -20,11 +20,18 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; @@ -34,6 +41,7 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -59,6 +67,13 @@ class AsyncConnectionConfiguration { /** How many retries are allowed before we start to log */ private final int startLogErrorsCnt; + private final long scanTimeoutNs; + + private final int scannerCaching; + + private final long scannerMaxResultSize; + + @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -68,11 +83,18 @@ class AsyncConnectionConfiguration { conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); - this.pauseNs = TimeUnit.MILLISECONDS - .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + this.pauseNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, - DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + this.startLogErrorsCnt = + conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + this.scanTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); + this.scannerCaching = + conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); } long getMetaOperationTimeoutNs() { @@ -103,4 +125,15 @@ class AsyncConnectionConfiguration { return startLogErrorsCnt; } + public long getScanTimeoutNs() { + return scanTimeoutNs; + } + + public int getScannerCaching() { + return scannerCaching; + } + + public long getScannerMaxResultSize() { + return scannerMaxResultSize; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index dc75ba6d8e5..321fd7166aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; + import java.io.Closeable; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -27,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; /** * TODO: reimplement using aync connection when the scan logic is ready. The current implementation @@ -52,6 +55,26 @@ class AsyncRegionLocator implements Closeable { return future; } + CompletableFuture getPreviousRegionLocation(TableName tableName, + byte[] startRowOfCurrentRegion, boolean reload) { + CompletableFuture future = new CompletableFuture<>(); + byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion); + try { + for (;;) { + HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload); + byte[] endKey = loc.getRegionInfo().getEndKey(); + if (Bytes.equals(startRowOfCurrentRegion, endKey)) { + future.complete(loc); + break; + } + toLocateRow = endKey; + } + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception, ServerName source) { conn.updateCachedLocations(tableName, regionName, row, exception, source); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index c5ac9a5f16d..9020ce50ceb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import io.netty.util.HashedWheelTimer; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -54,6 +56,8 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs = -1L; + private boolean locateToPreviousRegion; + public SingleRequestCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -64,8 +68,8 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SingleRequestCallerBuilder action( - AsyncSingleRequestRpcRetryingCaller.Callable callable) { + public SingleRequestCallerBuilder + action(AsyncSingleRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -80,11 +84,18 @@ class AsyncRpcRetryingCallerFactory { return this; } + public SingleRequestCallerBuilder locateToPreviousRegion(boolean locateToPreviousRegion) { + this.locateToPreviousRegion = locateToPreviousRegion; + return this; + } + public AsyncSingleRequestRpcRetryingCaller build() { return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, - Preconditions.checkNotNull(tableName, "tableName is null"), - Preconditions.checkNotNull(row, "row is null"), - Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(), + checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), + locateToPreviousRegion + ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re) + : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re), + checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); } @@ -103,4 +114,64 @@ class AsyncRpcRetryingCallerFactory { public SingleRequestCallerBuilder single() { return new SingleRequestCallerBuilder<>(); } + + public class SmallScanCallerBuilder { + + private TableName tableName; + + private Scan scan; + + private int limit; + + private long scanTimeoutNs = -1L; + + private long rpcTimeoutNs = -1L; + + public SmallScanCallerBuilder table(TableName tableName) { + this.tableName = tableName; + return this; + } + + public SmallScanCallerBuilder setScan(Scan scan) { + this.scan = scan; + return this; + } + + public SmallScanCallerBuilder limit(int limit) { + this.limit = limit; + return this; + } + + public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(scanTimeout); + return this; + } + + public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public AsyncSmallScanRpcRetryingCaller build() { + TableName tableName = checkNotNull(this.tableName, "tableName is null"); + Scan scan = checkNotNull(this.scan, "scan is null"); + checkArgument(limit > 0, "invalid limit %d", limit); + return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs, + rpcTimeoutNs); + } + + /** + * Shortcut for {@code build().call()} + */ + public CompletableFuture> call() { + return build().call(); + } + } + + /** + * Create retry caller for small scan. + */ + public SmallScanCallerBuilder smallScan() { + return new SmallScanCallerBuilder(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 8acde94662a..1d0357dffbf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -60,6 +60,12 @@ class AsyncSingleRequestRpcRetryingCaller { ClientService.Interface stub); } + @FunctionalInterface + public interface RegionLocator { + CompletableFuture locate(AsyncConnectionImpl conn, TableName tableName, + byte[] row, boolean reload); + } + private final HashedWheelTimer retryTimer; private final AsyncConnectionImpl conn; @@ -68,6 +74,8 @@ class AsyncSingleRequestRpcRetryingCaller { private final byte[] row; + private final RegionLocator locator; + private final Callable callable; private final long pauseNs; @@ -89,12 +97,13 @@ class AsyncSingleRequestRpcRetryingCaller { private final long startNs; public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, byte[] row, Callable callable, long pauseNs, int maxRetries, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, byte[] row, RegionLocator locator, Callable callable, long pauseNs, + int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.row = row; + this.locator = locator; this.callable = callable; this.pauseNs = pauseNs; this.maxAttempts = retries2Attempts(maxRetries); @@ -207,7 +216,7 @@ class AsyncSingleRequestRpcRetryingCaller { } private void locateThenCall() { - conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> { + locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> { if (error != null) { onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java new file mode 100644 index 00000000000..af639c0854f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Retry caller for smaller scan. + */ +@InterfaceAudience.Private +class AsyncSmallScanRpcRetryingCaller { + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final Scan scan; + + private final int limit; + + private final long scanTimeoutNs; + + private final long rpcTimeoutNs; + + private final Function createClosestNextRow; + + private final Runnable firstScan; + + private final Function nextScan; + + private final List resultList; + + private final CompletableFuture> future; + + public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan, + int limit, long scanTimeoutNs, long rpcTimeoutNs) { + this.conn = conn; + this.tableName = tableName; + this.scan = scan; + this.limit = limit; + this.scanTimeoutNs = scanTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; + if (scan.isReversed()) { + this.createClosestNextRow = ConnectionUtils::createClosestRowBefore; + this.firstScan = this::reversedFirstScan; + this.nextScan = this::reversedNextScan; + } else { + this.createClosestNextRow = ConnectionUtils::createClosestRowAfter; + this.firstScan = this::firstScan; + this.nextScan = this::nextScan; + } + this.resultList = new ArrayList<>(); + this.future = new CompletableFuture<>(); + } + + private static final class SmallScanResponse { + + public final Result[] results; + + public final HRegionInfo currentRegion; + + public final boolean hasMoreResultsInRegion; + + public SmallScanResponse(Result[] results, HRegionInfo currentRegion, + boolean hasMoreResultsInRegion) { + this.results = results; + this.currentRegion = currentRegion; + this.hasMoreResultsInRegion = hasMoreResultsInRegion; + } + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", + justification = "Findbugs seems to be confused by lambda expression.") + private CompletableFuture scan(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + ScanRequest req; + try { + req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, + limit - resultList.size(), true); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + stub.scan(controller, req, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp); + future.complete( + new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion())); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + }); + return future; + } + + private void onComplete(SmallScanResponse resp) { + resultList.addAll(Arrays.asList(resp.results)); + if (resultList.size() == limit) { + future.complete(resultList); + return; + } + if (resp.hasMoreResultsInRegion) { + if (resp.results.length > 0) { + scan.setStartRow( + createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow())); + } + scan(false); + return; + } + if (!nextScan.apply(resp.currentRegion)) { + future.complete(resultList); + } + } + + private void scan(boolean locateToPreviousRegion) { + conn.callerFactory. single().table(tableName).row(scan.getStartRow()) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) + .locateToPreviousRegion(locateToPreviousRegion).action(this::scan).call() + .whenComplete((resp, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + onComplete(resp); + } + }); + } + + public CompletableFuture> call() { + firstScan.run(); + return future; + } + + private void firstScan() { + scan(false); + } + + private void reversedFirstScan() { + scan(isEmptyStartRow(scan.getStartRow())); + } + + private boolean nextScan(HRegionInfo region) { + if (isEmptyStopRow(scan.getStopRow())) { + if (isEmptyStopRow(region.getEndKey())) { + return false; + } + } else { + if (Bytes.compareTo(region.getEndKey(), scan.getStopRow()) >= 0) { + return false; + } + } + scan.setStartRow(region.getEndKey()); + scan(false); + return true; + } + + private boolean reversedNextScan(HRegionInfo region) { + if (isEmptyStopRow(scan.getStopRow())) { + if (isEmptyStartRow(region.getStartKey())) { + return false; + } + } else { + if (Bytes.compareTo(region.getStartKey(), scan.getStopRow()) <= 0) { + return false; + } + } + scan.setStartRow(region.getStartKey()); + scan(true); + return true; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 46427463675..94747b9144e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import com.google.common.base.Preconditions; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -312,4 +313,26 @@ public interface AsyncTable { */ CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, RowMutations mutation); + + /** + * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}. + * @see #smallScan(Scan, int) + */ + default CompletableFuture> smallScan(Scan scan) { + return smallScan(scan, Integer.MAX_VALUE); + } + + /** + * Return all the results that match the given scan object. The number of the returned results + * will not be greater than {@code limit}. + *

+ * Notice that the scan must be small, and should not use batch or allowPartialResults. The + * {@code caching} property of the scan object is also ignored as we will use {@code limit} + * instead. + * @param scan A configured {@link Scan} object. + * @param limit the limit of results count + * @return The results of this small scan operation. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture> smallScan(Scan scan, int limit); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 77a5bbef0ed..ce53775c50b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The implementation of AsyncTable. @@ -57,12 +59,18 @@ class AsyncTableImpl implements AsyncTable { private final TableName tableName; + private final int defaultScannerCaching; + + private final long defaultScannerMaxResultSize; + private long readRpcTimeoutNs; private long writeRpcTimeoutNs; private long operationTimeoutNs; + private long scanTimeoutNs; + public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { this.conn = conn; this.tableName = tableName; @@ -70,6 +78,9 @@ class AsyncTableImpl implements AsyncTable { this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() : conn.connConf.getOperationTimeoutNs(); + this.defaultScannerCaching = conn.connConf.getScannerCaching(); + this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); + this.scanTimeoutNs = conn.connConf.getScanTimeoutNs(); } @Override @@ -256,8 +267,8 @@ class AsyncTableImpl implements AsyncTable { future.completeExceptionally(controller.getFailed()); } else { try { - org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter - .getResults(req, resp, controller.cellScanner()); + org.apache.hadoop.hbase.client.MultiResponse multiResp = + ResponseConverter.getResults(req, resp, controller.cellScanner()); Throwable ex = multiResp.getException(regionName); if (ex != null) { future @@ -305,6 +316,38 @@ class AsyncTableImpl implements AsyncTable { .call(); } + private CompletableFuture failedFuture(Throwable error) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(error); + return future; + } + + private Scan setDefaultScanConfig(Scan scan) { + // always create a new scan object as we may reset the start row later. + Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); + if (newScan.getCaching() <= 0) { + newScan.setCaching(defaultScannerCaching); + } + if (newScan.getMaxResultSize() <= 0) { + newScan.setMaxResultSize(defaultScannerMaxResultSize); + } + return newScan; + } + + @Override + public CompletableFuture> smallScan(Scan scan, int limit) { + if (!scan.isSmall()) { + return failedFuture(new IllegalArgumentException("Only small scan is allowed")); + } + if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { + return failedFuture( + new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); + } + return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) + .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + } + @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 371a68aea9e..00ff3500f10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,7 +17,20 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; + import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,20 +48,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ExecutorService; - /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -56,53 +60,51 @@ import java.util.concurrent.ExecutorService; */ @InterfaceAudience.Private public abstract class ClientScanner extends AbstractClientScanner { - private static final Log LOG = LogFactory.getLog(ClientScanner.class); - // A byte array in which all elements are the max byte, and it is used to - // construct closest front row - static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); - protected Scan scan; - protected boolean closed = false; - // Current region scanner is against. Gets cleared if current region goes - // wonky: e.g. if it splits on us. - protected HRegionInfo currentRegion = null; - protected ScannerCallableWithReplicas callable = null; - protected Queue cache; - /** - * A list of partial results that have been returned from the server. This list should only - * contain results if this scanner does not have enough partial results to form the complete - * result. - */ - protected final LinkedList partialResults = new LinkedList(); - /** - * The row for which we are accumulating partial Results (i.e. the row of the Results stored - * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync - * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} - */ - protected byte[] partialResultsRow = null; - /** - * The last cell from a not full Row which is added to cache - */ - protected Cell lastCellLoadedToCache = null; - protected final int caching; - protected long lastNext; - // Keep lastResult returned successfully in case we have to reset scanner. - protected Result lastResult = null; - protected final long maxScannerResultSize; - private final ClusterConnection connection; - private final TableName tableName; - protected final int scannerTimeout; - protected boolean scanMetricsPublished = false; - protected RpcRetryingCaller caller; - protected RpcControllerFactory rpcControllerFactory; - protected Configuration conf; - //The timeout on the primary. Applicable if there are multiple replicas for a region - //In that case, we will only wait for this much timeout on the primary before going - //to the replicas and trying the same scan. Note that the retries will still happen - //on each replica and the first successful results will be taken. A timeout of 0 is - //disallowed. - protected final int primaryOperationTimeout; - private int retries; - protected final ExecutorService pool; + private static final Log LOG = LogFactory.getLog(ClientScanner.class); + + protected Scan scan; + protected boolean closed = false; + // Current region scanner is against. Gets cleared if current region goes + // wonky: e.g. if it splits on us. + protected HRegionInfo currentRegion = null; + protected ScannerCallableWithReplicas callable = null; + protected Queue cache; + /** + * A list of partial results that have been returned from the server. This list should only + * contain results if this scanner does not have enough partial results to form the complete + * result. + */ + protected final LinkedList partialResults = new LinkedList(); + /** + * The row for which we are accumulating partial Results (i.e. the row of the Results stored + * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via + * the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} + */ + protected byte[] partialResultsRow = null; + /** + * The last cell from a not full Row which is added to cache + */ + protected Cell lastCellLoadedToCache = null; + protected final int caching; + protected long lastNext; + // Keep lastResult returned successfully in case we have to reset scanner. + protected Result lastResult = null; + protected final long maxScannerResultSize; + private final ClusterConnection connection; + private final TableName tableName; + protected final int scannerTimeout; + protected boolean scanMetricsPublished = false; + protected RpcRetryingCaller caller; + protected RpcControllerFactory rpcControllerFactory; + protected Configuration conf; + // The timeout on the primary. Applicable if there are multiple replicas for a region + // In that case, we will only wait for this much timeout on the primary before going + // to the replicas and trying the same scan. Note that the retries will still happen + // on each replica and the first successful results will be taken. A timeout of 0 is + // disallowed. + protected final int primaryOperationTimeout; + private int retries; + protected final ExecutorService pool; /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start @@ -447,7 +449,7 @@ public abstract class ClientScanner extends AbstractClientScanner { if (scan.isReversed()) { scan.setStartRow(createClosestRowBefore(lastResult.getRow())); } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); + scan.setStartRow(createClosestRowAfter(lastResult.getRow())); } } else { // we need rescan this row because we only loaded partial row before @@ -737,49 +739,27 @@ public abstract class ClientScanner extends AbstractClientScanner { } } - @Override - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - if (callable != null) { - callable.setClose(); - try { - call(callable, caller, scannerTimeout); - } catch (UnknownScannerException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just due to lease time out. - if (LOG.isDebugEnabled()) { - LOG.debug("scanner failed to close", e); - } - } catch (IOException e) { - /* An exception other than UnknownScanner is unexpected. */ - LOG.warn("scanner failed to close.", e); + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + if (callable != null) { + callable.setClose(); + try { + call(callable, caller, scannerTimeout); + } catch (UnknownScannerException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just due to lease time out. + if (LOG.isDebugEnabled()) { + LOG.debug("scanner failed to close", e); } - callable = null; + } catch (IOException e) { + /* An exception other than UnknownScanner is unexpected. */ + LOG.warn("scanner failed to close.", e); } - closed = true; - } - - /** - * Create the closest row before the specified row - * @param row - * @return a new byte array which is the closest front row of the specified one - */ - protected static byte[] createClosestRowBefore(byte[] row) { - if (row == null) { - throw new IllegalArgumentException("The passed row is empty"); - } - if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) { - return MAX_BYTE_ARRAY; - } - if (row[row.length - 1] == 0) { - return Arrays.copyOf(row, row.length - 1); - } else { - byte[] closestFrontRow = Arrays.copyOf(row, row.length); - closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1); - closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY); - return closestFrontRow; + callable = null; } + closed = true; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 5fac93a232b..971a2ca5cdf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -16,9 +16,11 @@ * License for the specific language governing permissions and limitations * under the License. */ - package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; + +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -36,8 +38,6 @@ import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFac import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; - /** *

* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index eca9ad8d693..e0030e81bb7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -33,10 +37,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.util.Bytes; /** * Utility used by client connections. @@ -222,4 +227,41 @@ public final class ConnectionUtils { return HConstants.NO_NONCE; } }; + + // A byte array in which all elements are the max byte, and it is used to + // construct closest front row + static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + + /** + * Create the closest row after the specified row + */ + static byte[] createClosestRowAfter(byte[] row) { + return Arrays.copyOf(row, row.length + 1); + } + + /** + * Create the closest row before the specified row + */ + static byte[] createClosestRowBefore(byte[] row) { + if (row.length == 0) { + return MAX_BYTE_ARRAY; + } + if (row[row.length - 1] == 0) { + return Arrays.copyOf(row, row.length - 1); + } else { + byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; + System.arraycopy(row, 0, nextRow, 0, row.length - 1); + nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); + System.arraycopy(nextRow, row.length, MAX_BYTE_ARRAY, 0, MAX_BYTE_ARRAY.length); + return nextRow; + } + } + + static boolean isEmptyStartRow(byte[] row) { + return Bytes.equals(row, EMPTY_START_ROW); + } + + static boolean isEmptyStopRow(byte[] row) { + return Bytes.equals(row, EMPTY_END_ROW); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index dde82ba5447..390e2362d6d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; + import java.io.IOException; import java.util.concurrent.ExecutorService; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 5174598c1e4..e04fd6e0bde 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; + +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; @@ -30,7 +33,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -40,12 +42,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import com.google.common.annotations.VisibleForTesting; - /** * This class has the logic for handling scanners for regions with and without replicas. * 1. A scan is attempted on the default (primary) region @@ -341,7 +339,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { if (callable.getScan().isReversed()) { callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); } else { - callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); + callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow())); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java new file mode 100644 index 00000000000..972780e4e72 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableSmallScan { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static int COUNT = 1000; + + private static AsyncConnection ASYNC_CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + List> futures = new ArrayList<>(); + IntStream.range(0, COUNT) + .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))))); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScanAll() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + List results = table.smallScan(new Scan().setSmall(true)).get(); + assertEquals(COUNT, results.size()); + IntStream.range(0, COUNT).forEach(i -> { + Result result = results.get(i); + assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } + + @Test + public void testReversedScanAll() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + List results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get(); + assertEquals(COUNT, results.size()); + IntStream.range(0, COUNT).forEach(i -> { + Result result = results.get(i); + int actualIndex = COUNT - i - 1; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } + + @Test + public void testScanNoStopKey() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int start = 345; + List results = table + .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get(); + assertEquals(COUNT - start, results.size()); + IntStream.range(0, COUNT - start).forEach(i -> { + Result result = results.get(i); + int actualIndex = start + i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } + + @Test + public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int start = 765; + List results = table + .smallScan( + new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true)) + .get(); + assertEquals(start + 1, results.size()); + IntStream.range(0, start + 1).forEach(i -> { + Result result = results.get(i); + int actualIndex = start - i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } + + private void testScan(int start, int stop) throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get(); + assertEquals(stop - start, results.size()); + IntStream.range(0, stop - start).forEach(i -> { + Result result = results.get(i); + int actualIndex = start + i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } + + private void testReversedScan(int start, int stop) + throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true)) + .get(); + assertEquals(start - stop, results.size()); + IntStream.range(0, start - stop).forEach(i -> { + Result result = results.get(i); + int actualIndex = start - i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } + + @Test + public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException { + testScan(345, 567); + } + + @Test + public void testReversedScanWithStartKeyAndStopKey() + throws InterruptedException, ExecutionException { + testReversedScan(765, 543); + } + + @Test + public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException { + testScan(222, 333); + } + + @Test + public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException { + testScan(222, 333); + } + + @Test + public void testScanWithLimit() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int start = 111; + int stop = 888; + int limit = 300; + List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true), + limit).get(); + assertEquals(limit, results.size()); + IntStream.range(0, limit).forEach(i -> { + Result result = results.get(i); + int actualIndex = start + i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } + + @Test + public void testReversedScanWithLimit() throws InterruptedException, ExecutionException { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + int start = 888; + int stop = 111; + int limit = 300; + List results = table.smallScan( + new Scan(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true), + limit).get(); + assertEquals(limit, results.size()); + IntStream.range(0, limit).forEach(i -> { + Result result = results.get(i); + int actualIndex = start - i; + assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); + assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 89841a93da5..ae93e678788 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -5933,7 +5934,7 @@ public class TestFromClientSide { public void testReversedScanUnderMultiRegions() throws Exception { // Test Initialization. TableName TABLE = TableName.valueOf("testReversedScanUnderMultiRegions"); - byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY; + byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY; byte[][] splitRows = new byte[][] { Bytes.toBytes("005"), Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)), Bytes.toBytes("006"),