From 5cee6a39c21966bd82f5778f55295559cd663a31 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 26 Oct 2016 17:22:50 +0800 Subject: [PATCH] Revert "Implement small scan" due to miss issue number This reverts commit c7c45f2c85cddd860a293fe9364b2b7ab0ab5bba. --- .../client/AsyncConnectionConfiguration.java | 41 +--- .../hbase/client/AsyncRegionLocator.java | 23 -- .../client/AsyncRpcRetryingCallerFactory.java | 83 +------ .../AsyncSingleRequestRpcRetryingCaller.java | 15 +- .../AsyncSmallScanRpcRetryingCaller.java | 211 ----------------- .../hadoop/hbase/client/AsyncTable.java | 23 -- .../hadoop/hbase/client/AsyncTableImpl.java | 47 +--- .../hadoop/hbase/client/ClientScanner.java | 176 +++++++------- .../client/ClientSmallReversedScanner.java | 6 +- .../hadoop/hbase/client/ConnectionUtils.java | 46 +--- .../hbase/client/ReversedClientScanner.java | 2 - .../client/ScannerCallableWithReplicas.java | 12 +- .../hbase/client/TestAsyncTableSmallScan.java | 219 ------------------ .../hbase/client/TestFromClientSide.java | 3 +- 14 files changed, 126 insertions(+), 781 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index aaac8456b3d..ba2e660f3dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -20,18 +20,11 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; -import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; @@ -41,7 +34,6 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -67,13 +59,6 @@ class AsyncConnectionConfiguration { /** How many retries are allowed before we start to log */ private final int startLogErrorsCnt; - private final long scanTimeoutNs; - - private final int scannerCaching; - - private final long scannerMaxResultSize; - - @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); @@ -83,18 +68,11 @@ class AsyncConnectionConfiguration { conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); - this.pauseNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + this.pauseNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.startLogErrorsCnt = - conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); - this.scanTimeoutNs = TimeUnit.MILLISECONDS - .toNanos(HBaseConfiguration.getInt(conf, HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); - this.scannerCaching = - conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING); - this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, + DEFAULT_START_LOG_ERRORS_AFTER_COUNT); } long getMetaOperationTimeoutNs() { @@ -125,15 +103,4 @@ class AsyncConnectionConfiguration { return startLogErrorsCnt; } - public long getScanTimeoutNs() { - return scanTimeoutNs; - } - - public int getScannerCaching() { - return scannerCaching; - } - - public long getScannerMaxResultSize() { - return scannerMaxResultSize; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 321fd7166aa..dc75ba6d8e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.*; - import java.io.Closeable; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -29,7 +27,6 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; /** * TODO: reimplement using aync connection when the scan logic is ready. The current implementation @@ -55,26 +52,6 @@ class AsyncRegionLocator implements Closeable { return future; } - CompletableFuture getPreviousRegionLocation(TableName tableName, - byte[] startRowOfCurrentRegion, boolean reload) { - CompletableFuture future = new CompletableFuture<>(); - byte[] toLocateRow = createClosestRowBefore(startRowOfCurrentRegion); - try { - for (;;) { - HRegionLocation loc = conn.getRegionLocation(tableName, toLocateRow, reload); - byte[] endKey = loc.getRegionInfo().getEndKey(); - if (Bytes.equals(startRowOfCurrentRegion, endKey)) { - future.complete(loc); - break; - } - toLocateRow = endKey; - } - } catch (IOException e) { - future.completeExceptionally(e); - } - return future; - } - void updateCachedLocations(TableName tableName, byte[] regionName, byte[] row, Object exception, ServerName source) { conn.updateCachedLocations(tableName, regionName, row, exception, source); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 9020ce50ceb..c5ac9a5f16d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -17,12 +17,10 @@ */ package org.apache.hadoop.hbase.client; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Preconditions; import io.netty.util.HashedWheelTimer; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -56,8 +54,6 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs = -1L; - private boolean locateToPreviousRegion; - public SingleRequestCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -68,8 +64,8 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SingleRequestCallerBuilder - action(AsyncSingleRequestRpcRetryingCaller.Callable callable) { + public SingleRequestCallerBuilder action( + AsyncSingleRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -84,18 +80,11 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SingleRequestCallerBuilder locateToPreviousRegion(boolean locateToPreviousRegion) { - this.locateToPreviousRegion = locateToPreviousRegion; - return this; - } - public AsyncSingleRequestRpcRetryingCaller build() { return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, - checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), - locateToPreviousRegion - ? (c, tn, r, re) -> c.getLocator().getPreviousRegionLocation(tn, r, re) - : (c, tn, r, re) -> c.getLocator().getRegionLocation(tn, r, re), - checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(), + Preconditions.checkNotNull(tableName, "tableName is null"), + Preconditions.checkNotNull(row, "row is null"), + Preconditions.checkNotNull(callable, "action is null"), conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); } @@ -114,64 +103,4 @@ class AsyncRpcRetryingCallerFactory { public SingleRequestCallerBuilder single() { return new SingleRequestCallerBuilder<>(); } - - public class SmallScanCallerBuilder { - - private TableName tableName; - - private Scan scan; - - private int limit; - - private long scanTimeoutNs = -1L; - - private long rpcTimeoutNs = -1L; - - public SmallScanCallerBuilder table(TableName tableName) { - this.tableName = tableName; - return this; - } - - public SmallScanCallerBuilder setScan(Scan scan) { - this.scan = scan; - return this; - } - - public SmallScanCallerBuilder limit(int limit) { - this.limit = limit; - return this; - } - - public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(scanTimeout); - return this; - } - - public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { - this.rpcTimeoutNs = unit.toNanos(rpcTimeout); - return this; - } - - public AsyncSmallScanRpcRetryingCaller build() { - TableName tableName = checkNotNull(this.tableName, "tableName is null"); - Scan scan = checkNotNull(this.scan, "scan is null"); - checkArgument(limit > 0, "invalid limit %d", limit); - return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs, - rpcTimeoutNs); - } - - /** - * Shortcut for {@code build().call()} - */ - public CompletableFuture> call() { - return build().call(); - } - } - - /** - * Create retry caller for small scan. - */ - public SmallScanCallerBuilder smallScan() { - return new SmallScanCallerBuilder(); - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 1d0357dffbf..8acde94662a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -60,12 +60,6 @@ class AsyncSingleRequestRpcRetryingCaller { ClientService.Interface stub); } - @FunctionalInterface - public interface RegionLocator { - CompletableFuture locate(AsyncConnectionImpl conn, TableName tableName, - byte[] row, boolean reload); - } - private final HashedWheelTimer retryTimer; private final AsyncConnectionImpl conn; @@ -74,8 +68,6 @@ class AsyncSingleRequestRpcRetryingCaller { private final byte[] row; - private final RegionLocator locator; - private final Callable callable; private final long pauseNs; @@ -97,13 +89,12 @@ class AsyncSingleRequestRpcRetryingCaller { private final long startNs; public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, byte[] row, RegionLocator locator, Callable callable, long pauseNs, - int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, byte[] row, Callable callable, long pauseNs, int maxRetries, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.row = row; - this.locator = locator; this.callable = callable; this.pauseNs = pauseNs; this.maxAttempts = retries2Attempts(maxRetries); @@ -216,7 +207,7 @@ class AsyncSingleRequestRpcRetryingCaller { } private void locateThenCall() { - locator.locate(conn, tableName, row, tries > 1).whenComplete((loc, error) -> { + conn.getLocator().getRegionLocation(tableName, row, tries > 1).whenComplete((loc, error) -> { if (error != null) { onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed, tries = " diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java deleted file mode 100644 index af639c0854f..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; -import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Retry caller for smaller scan. - */ -@InterfaceAudience.Private -class AsyncSmallScanRpcRetryingCaller { - - private final AsyncConnectionImpl conn; - - private final TableName tableName; - - private final Scan scan; - - private final int limit; - - private final long scanTimeoutNs; - - private final long rpcTimeoutNs; - - private final Function createClosestNextRow; - - private final Runnable firstScan; - - private final Function nextScan; - - private final List resultList; - - private final CompletableFuture> future; - - public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan, - int limit, long scanTimeoutNs, long rpcTimeoutNs) { - this.conn = conn; - this.tableName = tableName; - this.scan = scan; - this.limit = limit; - this.scanTimeoutNs = scanTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; - if (scan.isReversed()) { - this.createClosestNextRow = ConnectionUtils::createClosestRowBefore; - this.firstScan = this::reversedFirstScan; - this.nextScan = this::reversedNextScan; - } else { - this.createClosestNextRow = ConnectionUtils::createClosestRowAfter; - this.firstScan = this::firstScan; - this.nextScan = this::nextScan; - } - this.resultList = new ArrayList<>(); - this.future = new CompletableFuture<>(); - } - - private static final class SmallScanResponse { - - public final Result[] results; - - public final HRegionInfo currentRegion; - - public final boolean hasMoreResultsInRegion; - - public SmallScanResponse(Result[] results, HRegionInfo currentRegion, - boolean hasMoreResultsInRegion) { - this.results = results; - this.currentRegion = currentRegion; - this.hasMoreResultsInRegion = hasMoreResultsInRegion; - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "Findbugs seems to be confused by lambda expression.") - private CompletableFuture scan(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub) { - CompletableFuture future = new CompletableFuture<>(); - ScanRequest req; - try { - req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, - limit - resultList.size(), true); - } catch (IOException e) { - future.completeExceptionally(e); - return future; - } - stub.scan(controller, req, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - try { - Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp); - future.complete( - new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion())); - } catch (IOException e) { - future.completeExceptionally(e); - } - } - }); - return future; - } - - private void onComplete(SmallScanResponse resp) { - resultList.addAll(Arrays.asList(resp.results)); - if (resultList.size() == limit) { - future.complete(resultList); - return; - } - if (resp.hasMoreResultsInRegion) { - if (resp.results.length > 0) { - scan.setStartRow( - createClosestNextRow.apply(resp.results[resp.results.length - 1].getRow())); - } - scan(false); - return; - } - if (!nextScan.apply(resp.currentRegion)) { - future.complete(resultList); - } - } - - private void scan(boolean locateToPreviousRegion) { - conn.callerFactory. single().table(tableName).row(scan.getStartRow()) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .locateToPreviousRegion(locateToPreviousRegion).action(this::scan).call() - .whenComplete((resp, error) -> { - if (error != null) { - future.completeExceptionally(error); - } else { - onComplete(resp); - } - }); - } - - public CompletableFuture> call() { - firstScan.run(); - return future; - } - - private void firstScan() { - scan(false); - } - - private void reversedFirstScan() { - scan(isEmptyStartRow(scan.getStartRow())); - } - - private boolean nextScan(HRegionInfo region) { - if (isEmptyStopRow(scan.getStopRow())) { - if (isEmptyStopRow(region.getEndKey())) { - return false; - } - } else { - if (Bytes.compareTo(region.getEndKey(), scan.getStopRow()) >= 0) { - return false; - } - } - scan.setStartRow(region.getEndKey()); - scan(false); - return true; - } - - private boolean reversedNextScan(HRegionInfo region) { - if (isEmptyStopRow(scan.getStopRow())) { - if (isEmptyStartRow(region.getStartKey())) { - return false; - } - } else { - if (Bytes.compareTo(region.getStartKey(), scan.getStopRow()) <= 0) { - return false; - } - } - scan.setStartRow(region.getStartKey()); - scan(true); - return true; - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 94747b9144e..46427463675 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client; import com.google.common.base.Preconditions; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -313,26 +312,4 @@ public interface AsyncTable { */ CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, RowMutations mutation); - - /** - * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}. - * @see #smallScan(Scan, int) - */ - default CompletableFuture> smallScan(Scan scan) { - return smallScan(scan, Integer.MAX_VALUE); - } - - /** - * Return all the results that match the given scan object. The number of the returned results - * will not be greater than {@code limit}. - *

- * Notice that the scan must be small, and should not use batch or allowPartialResults. The - * {@code caching} property of the scan object is also ignored as we will use {@code limit} - * instead. - * @param scan A configured {@link Scan} object. - * @param limit the limit of results count - * @return The results of this small scan operation. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - CompletableFuture> smallScan(Scan scan, int limit); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index ce53775c50b..77a5bbef0ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import java.io.IOException; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -47,7 +46,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResp import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ReflectionUtils; /** * The implementation of AsyncTable. @@ -59,18 +57,12 @@ class AsyncTableImpl implements AsyncTable { private final TableName tableName; - private final int defaultScannerCaching; - - private final long defaultScannerMaxResultSize; - private long readRpcTimeoutNs; private long writeRpcTimeoutNs; private long operationTimeoutNs; - private long scanTimeoutNs; - public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { this.conn = conn; this.tableName = tableName; @@ -78,9 +70,6 @@ class AsyncTableImpl implements AsyncTable { this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() : conn.connConf.getOperationTimeoutNs(); - this.defaultScannerCaching = conn.connConf.getScannerCaching(); - this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); - this.scanTimeoutNs = conn.connConf.getScanTimeoutNs(); } @Override @@ -267,8 +256,8 @@ class AsyncTableImpl implements AsyncTable { future.completeExceptionally(controller.getFailed()); } else { try { - org.apache.hadoop.hbase.client.MultiResponse multiResp = - ResponseConverter.getResults(req, resp, controller.cellScanner()); + org.apache.hadoop.hbase.client.MultiResponse multiResp = ResponseConverter + .getResults(req, resp, controller.cellScanner()); Throwable ex = multiResp.getException(regionName); if (ex != null) { future @@ -316,38 +305,6 @@ class AsyncTableImpl implements AsyncTable { .call(); } - private CompletableFuture failedFuture(Throwable error) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(error); - return future; - } - - private Scan setDefaultScanConfig(Scan scan) { - // always create a new scan object as we may reset the start row later. - Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); - if (newScan.getCaching() <= 0) { - newScan.setCaching(defaultScannerCaching); - } - if (newScan.getMaxResultSize() <= 0) { - newScan.setMaxResultSize(defaultScannerMaxResultSize); - } - return newScan; - } - - @Override - public CompletableFuture> smallScan(Scan scan, int limit) { - if (!scan.isSmall()) { - return failedFuture(new IllegalArgumentException("Only small scan is allowed")); - } - if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { - return failedFuture( - new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); - } - return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) - .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); - } - @Override public void setReadRpcTimeout(long timeout, TimeUnit unit) { this.readRpcTimeoutNs = unit.toNanos(timeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 00ff3500f10..371a68aea9e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,20 +17,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; - import com.google.common.annotations.VisibleForTesting; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ExecutorService; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,11 +35,20 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate @@ -60,51 +56,53 @@ import org.apache.hadoop.hbase.util.Bytes; */ @InterfaceAudience.Private public abstract class ClientScanner extends AbstractClientScanner { - private static final Log LOG = LogFactory.getLog(ClientScanner.class); - - protected Scan scan; - protected boolean closed = false; - // Current region scanner is against. Gets cleared if current region goes - // wonky: e.g. if it splits on us. - protected HRegionInfo currentRegion = null; - protected ScannerCallableWithReplicas callable = null; - protected Queue cache; - /** - * A list of partial results that have been returned from the server. This list should only - * contain results if this scanner does not have enough partial results to form the complete - * result. - */ - protected final LinkedList partialResults = new LinkedList(); - /** - * The row for which we are accumulating partial Results (i.e. the row of the Results stored - * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via - * the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} - */ - protected byte[] partialResultsRow = null; - /** - * The last cell from a not full Row which is added to cache - */ - protected Cell lastCellLoadedToCache = null; - protected final int caching; - protected long lastNext; - // Keep lastResult returned successfully in case we have to reset scanner. - protected Result lastResult = null; - protected final long maxScannerResultSize; - private final ClusterConnection connection; - private final TableName tableName; - protected final int scannerTimeout; - protected boolean scanMetricsPublished = false; - protected RpcRetryingCaller caller; - protected RpcControllerFactory rpcControllerFactory; - protected Configuration conf; - // The timeout on the primary. Applicable if there are multiple replicas for a region - // In that case, we will only wait for this much timeout on the primary before going - // to the replicas and trying the same scan. Note that the retries will still happen - // on each replica and the first successful results will be taken. A timeout of 0 is - // disallowed. - protected final int primaryOperationTimeout; - private int retries; - protected final ExecutorService pool; + private static final Log LOG = LogFactory.getLog(ClientScanner.class); + // A byte array in which all elements are the max byte, and it is used to + // construct closest front row + static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + protected Scan scan; + protected boolean closed = false; + // Current region scanner is against. Gets cleared if current region goes + // wonky: e.g. if it splits on us. + protected HRegionInfo currentRegion = null; + protected ScannerCallableWithReplicas callable = null; + protected Queue cache; + /** + * A list of partial results that have been returned from the server. This list should only + * contain results if this scanner does not have enough partial results to form the complete + * result. + */ + protected final LinkedList partialResults = new LinkedList(); + /** + * The row for which we are accumulating partial Results (i.e. the row of the Results stored + * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync + * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} + */ + protected byte[] partialResultsRow = null; + /** + * The last cell from a not full Row which is added to cache + */ + protected Cell lastCellLoadedToCache = null; + protected final int caching; + protected long lastNext; + // Keep lastResult returned successfully in case we have to reset scanner. + protected Result lastResult = null; + protected final long maxScannerResultSize; + private final ClusterConnection connection; + private final TableName tableName; + protected final int scannerTimeout; + protected boolean scanMetricsPublished = false; + protected RpcRetryingCaller caller; + protected RpcControllerFactory rpcControllerFactory; + protected Configuration conf; + //The timeout on the primary. Applicable if there are multiple replicas for a region + //In that case, we will only wait for this much timeout on the primary before going + //to the replicas and trying the same scan. Note that the retries will still happen + //on each replica and the first successful results will be taken. A timeout of 0 is + //disallowed. + protected final int primaryOperationTimeout; + private int retries; + protected final ExecutorService pool; /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start @@ -449,7 +447,7 @@ public abstract class ClientScanner extends AbstractClientScanner { if (scan.isReversed()) { scan.setStartRow(createClosestRowBefore(lastResult.getRow())); } else { - scan.setStartRow(createClosestRowAfter(lastResult.getRow())); + scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); } } else { // we need rescan this row because we only loaded partial row before @@ -739,27 +737,49 @@ public abstract class ClientScanner extends AbstractClientScanner { } } - @Override - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - if (callable != null) { - callable.setClose(); - try { - call(callable, caller, scannerTimeout); - } catch (UnknownScannerException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just due to lease time out. - if (LOG.isDebugEnabled()) { - LOG.debug("scanner failed to close", e); + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + if (callable != null) { + callable.setClose(); + try { + call(callable, caller, scannerTimeout); + } catch (UnknownScannerException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just due to lease time out. + if (LOG.isDebugEnabled()) { + LOG.debug("scanner failed to close", e); + } + } catch (IOException e) { + /* An exception other than UnknownScanner is unexpected. */ + LOG.warn("scanner failed to close.", e); } - } catch (IOException e) { - /* An exception other than UnknownScanner is unexpected. */ - LOG.warn("scanner failed to close.", e); + callable = null; } - callable = null; + closed = true; + } + + /** + * Create the closest row before the specified row + * @param row + * @return a new byte array which is the closest front row of the specified one + */ + protected static byte[] createClosestRowBefore(byte[] row) { + if (row == null) { + throw new IllegalArgumentException("The passed row is empty"); + } + if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) { + return MAX_BYTE_ARRAY; + } + if (row[row.length - 1] == 0) { + return Arrays.copyOf(row, row.length - 1); + } else { + byte[] closestFrontRow = Arrays.copyOf(row, row.length); + closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1); + closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY); + return closestFrontRow; } - closed = true; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java index 971a2ca5cdf..5fac93a232b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java @@ -16,11 +16,9 @@ * License for the specific language governing permissions and limitations * under the License. */ + package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; - -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -38,6 +36,8 @@ import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFac import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.annotations.VisibleForTesting; + /** *

* Client scanner for small reversed scan. Generally, only one RPC is called to fetch the diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index e0030e81bb7..eca9ad8d693 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -17,16 +17,12 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; -import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -37,11 +33,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; /** * Utility used by client connections. @@ -227,41 +222,4 @@ public final class ConnectionUtils { return HConstants.NO_NONCE; } }; - - // A byte array in which all elements are the max byte, and it is used to - // construct closest front row - static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); - - /** - * Create the closest row after the specified row - */ - static byte[] createClosestRowAfter(byte[] row) { - return Arrays.copyOf(row, row.length + 1); - } - - /** - * Create the closest row before the specified row - */ - static byte[] createClosestRowBefore(byte[] row) { - if (row.length == 0) { - return MAX_BYTE_ARRAY; - } - if (row[row.length - 1] == 0) { - return Arrays.copyOf(row, row.length - 1); - } else { - byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; - System.arraycopy(row, 0, nextRow, 0, row.length - 1); - nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); - System.arraycopy(nextRow, row.length, MAX_BYTE_ARRAY, 0, MAX_BYTE_ARRAY.length); - return nextRow; - } - } - - static boolean isEmptyStartRow(byte[] row) { - return Bytes.equals(row, EMPTY_START_ROW); - } - - static boolean isEmptyStopRow(byte[] row) { - return Bytes.equals(row, EMPTY_END_ROW); - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index 390e2362d6d..dde82ba5447 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; - import java.io.IOException; import java.util.concurrent.ExecutorService; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index e04fd6e0bde..5174598c1e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -18,10 +18,7 @@ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; -import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; - -import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore; import java.io.IOException; import java.io.InterruptedIOException; @@ -33,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -42,8 +40,12 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import com.google.common.annotations.VisibleForTesting; + /** * This class has the logic for handling scanners for regions with and without replicas. * 1. A scan is attempted on the default (primary) region @@ -339,7 +341,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { if (callable.getScan().isReversed()) { callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); } else { - callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow())); + callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java deleted file mode 100644 index 972780e4e72..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.stream.IntStream; - -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncTableSmallScan { - - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("async"); - - private static byte[] FAMILY = Bytes.toBytes("cf"); - - private static byte[] QUALIFIER = Bytes.toBytes("cq"); - - private static int COUNT = 1000; - - private static AsyncConnection ASYNC_CONN; - - @BeforeClass - public static void setUp() throws Exception { - TEST_UTIL.startMiniCluster(3); - byte[][] splitKeys = new byte[8][]; - for (int i = 111; i < 999; i += 111) { - splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); - } - TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); - TEST_UTIL.waitTableAvailable(TABLE_NAME); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List> futures = new ArrayList<>(); - IntStream.range(0, COUNT) - .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))))); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - } - - @AfterClass - public static void tearDown() throws Exception { - ASYNC_CONN.close(); - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testScanAll() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan().setSmall(true)).get(); - assertEquals(COUNT, results.size()); - IntStream.range(0, COUNT).forEach(i -> { - Result result = results.get(i); - assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); - assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testReversedScanAll() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan().setSmall(true).setReversed(true)).get(); - assertEquals(COUNT, results.size()); - IntStream.range(0, COUNT).forEach(i -> { - Result result = results.get(i); - int actualIndex = COUNT - i - 1; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testScanNoStopKey() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - int start = 345; - List results = table - .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true)).get(); - assertEquals(COUNT - start, results.size()); - IntStream.range(0, COUNT - start).forEach(i -> { - Result result = results.get(i); - int actualIndex = start + i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testReverseScanNoStopKey() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - int start = 765; - List results = table - .smallScan( - new Scan(Bytes.toBytes(String.format("%03d", start))).setSmall(true).setReversed(true)) - .get(); - assertEquals(start + 1, results.size()); - IntStream.range(0, start + 1).forEach(i -> { - Result result = results.get(i); - int actualIndex = start - i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - private void testScan(int start, int stop) throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true)).get(); - assertEquals(stop - start, results.size()); - IntStream.range(0, stop - start).forEach(i -> { - Result result = results.get(i); - int actualIndex = start + i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - private void testReversedScan(int start, int stop) - throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true)) - .get(); - assertEquals(start - stop, results.size()); - IntStream.range(0, start - stop).forEach(i -> { - Result result = results.get(i); - int actualIndex = start - i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testScanWithStartKeyAndStopKey() throws InterruptedException, ExecutionException { - testScan(345, 567); - } - - @Test - public void testReversedScanWithStartKeyAndStopKey() - throws InterruptedException, ExecutionException { - testReversedScan(765, 543); - } - - @Test - public void testScanAtRegionBoundary() throws InterruptedException, ExecutionException { - testScan(222, 333); - } - - @Test - public void testReversedScanAtRegionBoundary() throws InterruptedException, ExecutionException { - testScan(222, 333); - } - - @Test - public void testScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - int start = 111; - int stop = 888; - int limit = 300; - List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true), - limit).get(); - assertEquals(limit, results.size()); - IntStream.range(0, limit).forEach(i -> { - Result result = results.get(i); - int actualIndex = start + i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } - - @Test - public void testReversedScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); - int start = 888; - int stop = 111; - int limit = 300; - List results = table.smallScan( - new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true), - limit).get(); - assertEquals(limit, results.size()); - IntStream.range(0, limit).forEach(i -> { - Result result = results.get(i); - int actualIndex = start - i; - assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow())); - assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, QUALIFIER))); - }); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index ae93e678788..89841a93da5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -5934,7 +5933,7 @@ public class TestFromClientSide { public void testReversedScanUnderMultiRegions() throws Exception { // Test Initialization. TableName TABLE = TableName.valueOf("testReversedScanUnderMultiRegions"); - byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY; + byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY; byte[][] splitRows = new byte[][] { Bytes.toBytes("005"), Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)), Bytes.toBytes("006"),