From 85d701892ed969380a8bcca9c9f4e306c74af941 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 24 Jan 2017 21:07:25 +0800 Subject: [PATCH] HBASE-17045 Unify the implementation of small scan and regular scan --- .../hbase/client/AsyncClientScanner.java | 23 +- .../client/AsyncNonMetaRegionLocator.java | 4 +- .../client/AsyncRpcRetryingCallerFactory.java | 84 +-- ...syncScanSingleRegionRpcRetryingCaller.java | 49 +- .../AsyncSmallScanRpcRetryingCaller.java | 194 ------ .../hadoop/hbase/client/AsyncTableBase.java | 40 +- .../hadoop/hbase/client/AsyncTableImpl.java | 5 +- .../hbase/client/RawAsyncTableImpl.java | 56 +- .../hbase/client/RawScanResultConsumer.java | 4 +- .../org/apache/hadoop/hbase/client/Scan.java | 146 +++-- .../hadoop/hbase/client/ScannerCallable.java | 2 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 34 ++ .../hbase/shaded/protobuf/ProtobufUtil.java | 35 ++ .../shaded/protobuf/RequestConverter.java | 24 +- .../protobuf/generated/ClientProtos.java | 553 +++++++++++++---- .../src/main/protobuf/Client.proto | 11 +- .../protobuf/generated/ClientProtos.java | 556 ++++++++++++++---- hbase-protocol/src/main/protobuf/Client.proto | 11 +- .../hbase/regionserver/RSRpcServices.java | 18 + .../hbase/regionserver/StoreScanner.java | 38 +- .../client/AbstractTestAsyncTableScan.java | 6 + ...llScan.java => TestAsyncTableScanAll.java} | 61 +- .../hbase/client/TestRawAsyncTableScan.java | 5 - .../hbase/filter/TestMultiRowRangeFilter.java | 4 + 24 files changed, 1295 insertions(+), 668 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java rename hbase-server/src/test/java/org/apache/hadoop/hbase/client/{TestAsyncTableSmallScan.java => TestAsyncTableScanAll.java} (62%) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index f656a6c61bd..b9fd34f1046 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; /** * The asynchronous client scanner implementation. @@ -95,12 +96,16 @@ class AsyncClientScanner { public final ClientService.Interface stub; - public final long scannerId; + public final HBaseRpcController controller; - public OpenScannerResponse(HRegionLocation loc, Interface stub, long scannerId) { + public final ScanResponse resp; + + public OpenScannerResponse(HRegionLocation loc, Interface stub, HBaseRpcController controller, + ScanResponse resp) { this.loc = loc; this.stub = stub; - this.scannerId = scannerId; + this.controller = controller; + this.resp = resp; } } @@ -108,14 +113,14 @@ class AsyncClientScanner { HRegionLocation loc, ClientService.Interface stub) { CompletableFuture future = new CompletableFuture<>(); try { - ScanRequest request = - RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, 0, false); + ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), + scan, scan.getCaching(), false); stub.scan(controller, request, resp -> { if (controller.failed()) { future.completeExceptionally(controller.getFailed()); return; } - future.complete(new OpenScannerResponse(loc, stub, resp.getScannerId())); + future.complete(new OpenScannerResponse(loc, stub, controller, resp)); }); } catch (IOException e) { future.completeExceptionally(e); @@ -124,11 +129,11 @@ class AsyncClientScanner { } private void startScan(OpenScannerResponse resp) { - conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) - .setScan(scan).consumer(consumer).resultCache(resultCache) + conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) + .stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start() + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp) .whenComplete((hasMore, error) -> { if (error != null) { consumer.onError(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index ae79b650f3c..2c8669fba6d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.util.Bytes; /** @@ -376,7 +377,8 @@ class AsyncNonMetaRegionLocator { metaKey = createRegionName(tableName, req.row, NINES, false); } conn.getRawTable(META_TABLE_NAME) - .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) + .scanAll(new Scan().withStartRow(metaKey).setReversed(true).setReadType(ReadType.PREAD) + .addFamily(CATALOG_FAMILY).setLimit(1)) .whenComplete((results, error) -> onScanComplete(tableName, req, results, error)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 5df66ccc236..6bc2cc19f58 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -30,7 +30,9 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; /** * Factory to create an AsyncRpcRetryCaller. @@ -138,81 +140,6 @@ class AsyncRpcRetryingCallerFactory { return new SingleRequestCallerBuilder<>(); } - public class SmallScanCallerBuilder extends BuilderBase { - - private TableName tableName; - - private Scan scan; - - private int limit; - - private long scanTimeoutNs = -1L; - - private long rpcTimeoutNs = -1L; - - public SmallScanCallerBuilder table(TableName tableName) { - this.tableName = tableName; - return this; - } - - public SmallScanCallerBuilder setScan(Scan scan) { - this.scan = scan; - return this; - } - - public SmallScanCallerBuilder limit(int limit) { - this.limit = limit; - return this; - } - - public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(scanTimeout); - return this; - } - - public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { - this.rpcTimeoutNs = unit.toNanos(rpcTimeout); - return this; - } - - public SmallScanCallerBuilder pause(long pause, TimeUnit unit) { - this.pauseNs = unit.toNanos(pause); - return this; - } - - public SmallScanCallerBuilder maxAttempts(int maxAttempts) { - this.maxAttempts = maxAttempts; - return this; - } - - public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { - this.startLogErrorsCnt = startLogErrorsCnt; - return this; - } - - public AsyncSmallScanRpcRetryingCaller build() { - TableName tableName = checkNotNull(this.tableName, "tableName is null"); - Scan scan = checkNotNull(this.scan, "scan is null"); - checkArgument(limit > 0, "invalid limit %d", limit); - return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts, - scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); - } - - /** - * Shortcut for {@code build().call()} - */ - public CompletableFuture> call() { - return build().call(); - } - } - - /** - * Create retry caller for small scan. - */ - public SmallScanCallerBuilder smallScan() { - return new SmallScanCallerBuilder(); - } - public class ScanSingleRegionCallerBuilder extends BuilderBase { private long scannerId = -1L; @@ -297,10 +224,11 @@ class AsyncRpcRetryingCallerFactory { } /** - * Short cut for {@code build().start()}. + * Short cut for {@code build().start(HBaseRpcController, ScanResponse)}. */ - public CompletableFuture start() { - return build().start(); + public CompletableFuture start(HBaseRpcController controller, + ScanResponse respWhenOpen) { + return build().start(controller, respWhenOpen); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 5d3b736f98d..3ef4a6fcae9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import io.netty.util.HashedWheelTimer; @@ -135,6 +134,10 @@ class AsyncScanSingleRegionRpcRetryingCaller { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); } + private long remainingTimeNs() { + return scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + } + private void closeScanner() { resetController(controller, rpcTimeoutNs); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); @@ -199,7 +202,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } long delayNs; if (scanTimeoutNs > 0) { - long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { completeExceptionally(!scannerClosed); return; @@ -245,7 +248,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } } - private void onComplete(ScanResponse resp) { + private void onComplete(HBaseRpcController controller, ScanResponse resp) { if (controller.failed()) { onError(controller.getFailed()); return; @@ -288,6 +291,13 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeNoMoreResults(); return; } + if (scan.getLimit() > 0) { + // The RS should have set the moreResults field in ScanResponse to false when we have reached + // the limit. + int limit = scan.getLimit() - results.length; + assert limit > 0; + scan.setLimit(limit); + } // as in 2.0 this value will always be set if (!resp.getMoreResultsInRegion()) { completeWhenNoMoreResultsInRegion.run(); @@ -297,10 +307,26 @@ class AsyncScanSingleRegionRpcRetryingCaller { } private void call() { - resetController(controller, rpcTimeoutNs); + // As we have a call sequence for scan, it is useless to have a different rpc timeout which is + // less than the scan timeout. If the server does not respond in time(usually this will not + // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when + // resending the next request and the only way to fix this is to close the scanner and open a + // new one. + long callTimeoutNs; + if (scanTimeoutNs > 0) { + long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + if (remainingNs <= 0) { + completeExceptionally(true); + return; + } + callTimeoutNs = remainingNs; + } else { + callTimeoutNs = 0L; + } + resetController(controller, callTimeoutNs); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, - nextCallSeq, false, false); - stub.scan(controller, req, this::onComplete); + nextCallSeq, false, false, scan.getLimit()); + stub.scan(controller, req, resp -> onComplete(controller, resp)); } private void next() { @@ -312,10 +338,15 @@ class AsyncScanSingleRegionRpcRetryingCaller { } /** + * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also + * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the + * open scanner request is also needed because we may have some data in the CellScanner which is + * contained in the controller. * @return {@code true} if we should continue, otherwise {@code false}. */ - public CompletableFuture start() { - next(); + public CompletableFuture start(HBaseRpcController controller, + ScanResponse respWhenOpen) { + onComplete(controller, respWhenOpen); return future; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java deleted file mode 100644 index 98a276f884a..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; -import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; -import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; - -/** - * Retry caller for smaller scan. - */ -@InterfaceAudience.Private -class AsyncSmallScanRpcRetryingCaller { - - private final AsyncConnectionImpl conn; - - private final TableName tableName; - - private final Scan scan; - - private final int limit; - - private final long scanTimeoutNs; - - private final long rpcTimeoutNs; - - private final long pauseNs; - - private final int maxAttempts; - - private final int startLogErrosCnt; - - private final Function nextScan; - - private final List resultList; - - private final CompletableFuture> future; - - public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan, - int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, - int startLogErrosCnt) { - this.conn = conn; - this.tableName = tableName; - this.scan = scan; - this.limit = limit; - this.scanTimeoutNs = scanTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; - this.pauseNs = pauseNs; - this.maxAttempts = maxAttempts; - this.startLogErrosCnt = startLogErrosCnt; - if (scan.isReversed()) { - this.nextScan = this::reversedNextScan; - } else { - this.nextScan = this::nextScan; - } - this.resultList = new ArrayList<>(); - this.future = new CompletableFuture<>(); - } - - private static final class SmallScanResponse { - - public final Result[] results; - - public final HRegionInfo currentRegion; - - public final boolean hasMoreResultsInRegion; - - public SmallScanResponse(Result[] results, HRegionInfo currentRegion, - boolean hasMoreResultsInRegion) { - this.results = results; - this.currentRegion = currentRegion; - this.hasMoreResultsInRegion = hasMoreResultsInRegion; - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "Findbugs seems to be confused by lambda expression.") - private CompletableFuture scan(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub) { - CompletableFuture future = new CompletableFuture<>(); - ScanRequest req; - try { - req = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), scan, - limit - resultList.size(), true); - } catch (IOException e) { - future.completeExceptionally(e); - return future; - } - stub.scan(controller, req, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - try { - Result[] results = ResponseConverter.getResults(controller.cellScanner(), resp); - future.complete( - new SmallScanResponse(results, loc.getRegionInfo(), resp.getMoreResultsInRegion())); - } catch (IOException e) { - future.completeExceptionally(e); - } - } - }); - return future; - } - - private void onComplete(SmallScanResponse resp) { - resultList.addAll(Arrays.asList(resp.results)); - if (resultList.size() == limit) { - future.complete(resultList); - return; - } - if (resp.hasMoreResultsInRegion) { - if (resp.results.length > 0) { - scan.withStartRow(resp.results[resp.results.length - 1].getRow(), false); - } - scan(); - return; - } - if (!nextScan.apply(resp.currentRegion)) { - future.complete(resultList); - } - } - - private void scan() { - conn.callerFactory. single().table(tableName).row(scan.getStartRow()) - .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call() - .whenComplete((resp, error) -> { - if (error != null) { - future.completeExceptionally(error); - } else { - onComplete(resp); - } - }); - } - - public CompletableFuture> call() { - scan(); - return future; - } - - private boolean nextScan(HRegionInfo info) { - if (noMoreResultsForScan(scan, info)) { - return false; - } else { - scan.withStartRow(info.getEndKey()); - scan(); - return true; - } - } - - private boolean reversedNextScan(HRegionInfo info) { - if (noMoreResultsForReverseScan(scan, info)) { - return false; - } else { - scan.withStartRow(info.getStartKey(), false); - scan(); - return true; - } - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index d82fa22cf0c..e201ab23a11 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -300,26 +300,34 @@ public interface AsyncTableBase { CompareOp compareOp, byte[] value, RowMutations mutation); /** - * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}. - * @see #smallScan(Scan, int) - */ - default CompletableFuture> smallScan(Scan scan) { - return smallScan(scan, Integer.MAX_VALUE); - } - - /** - * Return all the results that match the given scan object. The number of the returned results - * will not be greater than {@code limit}. + * Return all the results that match the given scan object. *

- * Notice that the scan must be small, and should not use batch or allowPartialResults. The - * {@code caching} property of the scan object is also ignored as we will use {@code limit} - * instead. - * @param scan A configured {@link Scan} object. - * @param limit the limit of results count + * Notice that usually you should use this method with a {@link Scan} object that has limit set. + * For example, if you want to get the closest row after a given row, you could do this: + *

+ * + *

+   * 
+   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
+   *   if (results.isEmpty()) {
+   *      System.out.println("No row after " + Bytes.toStringBinary(row));
+   *   } else {
+   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
+   *         + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
+   *   }
+   * });
+   * 
+   * 
+ *

+ * If your result set is very large, you should use other scan method to get a scanner or use + * callback to process the results. They will do chunking to prevent OOM. The scanAll method will + * fetch all the results and store them in a List and then return the list to you. + * @param scan A configured {@link Scan} object. SO if you use this method to fetch a really large + * result set, it is likely to cause OOM. * @return The results of this small scan operation. The return value will be wrapped by a * {@link CompletableFuture}. */ - CompletableFuture> smallScan(Scan scan, int limit); + CompletableFuture> scanAll(Scan scan); /** * Test for the existence of columns in the table, as specified by the Gets. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 7cd257cfebd..f1625ad705c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -144,8 +144,8 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture> smallScan(Scan scan, int limit) { - return wrap(rawTable.smallScan(scan, limit)); + public CompletableFuture> scanAll(Scan scan) { + return wrap(rawTable.scanAll(scan)); } private long resultSize2CacheSize(long maxResultSize) { @@ -197,4 +197,5 @@ class AsyncTableImpl implements AsyncTable { public List> batch(List actions) { return rawTable. batch(actions).stream().map(this::wrap).collect(toList()); } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index d9d2d35c88c..87323ac278b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -21,13 +21,13 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -58,8 +58,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; @InterfaceAudience.Private class RawAsyncTableImpl implements RawAsyncTable { - private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class); - private final AsyncConnectionImpl conn; private final TableName tableName; @@ -332,12 +330,6 @@ class RawAsyncTableImpl implements RawAsyncTable { .call(); } - private CompletableFuture failedFuture(Throwable error) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(error); - return future; - } - private Scan setDefaultScanConfig(Scan scan) { // always create a new scan object as we may reset the start row later. Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); @@ -351,27 +343,35 @@ class RawAsyncTableImpl implements RawAsyncTable { } @Override - public CompletableFuture> smallScan(Scan scan, int limit) { - if (!scan.isSmall()) { - return failedFuture(new IllegalArgumentException("Only small scan is allowed")); - } - if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { - return failedFuture( - new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); - } - return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) - .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); + public CompletableFuture> scanAll(Scan scan) { + CompletableFuture> future = new CompletableFuture<>(); + List scanResults = new ArrayList<>(); + scan(scan, new RawScanResultConsumer() { + + @Override + public boolean onNext(Result[] results) { + scanResults.addAll(Arrays.asList(results)); + return true; + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(scanResults); + } + }); + return future; } public void scan(Scan scan, RawScanResultConsumer consumer) { - if (scan.isSmall()) { + if (scan.isSmall() || scan.getLimit() > 0) { if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { - consumer.onError( - new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); - } else { - LOG.warn("This is small scan " + scan + ", consider using smallScan directly?"); + consumer.onError(new IllegalArgumentException( + "Batch and allowPartial is not allowed for small scan or limited scan")); } } scan = setDefaultScanConfig(scan); @@ -388,6 +388,7 @@ class RawAsyncTableImpl implements RawAsyncTable { public List> put(List puts) { return voidMutate(puts); } + @Override public List> delete(List deletes) { return voidMutate(deletes); @@ -434,4 +435,5 @@ class RawAsyncTableImpl implements RawAsyncTable { public long getScanTimeout(TimeUnit unit) { return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); } + } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 7f0514c4c26..2e5d422f72e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -47,7 +47,9 @@ public interface RawScanResultConsumer { * This method give you a chance to terminate a slow scan operation. * @return {@code false} if you want to terminate the scan process. Otherwise {@code true} */ - boolean onHeartbeat(); + default boolean onHeartbeat() { + return true; + } /** * Indicate that we hit an unrecoverable error and the scan operation is terminated. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 8d53b9a6583..31e76da06f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -46,38 +46,45 @@ import org.apache.hadoop.hbase.util.Bytes; /** * Used to perform Scan operations. *

- * All operations are identical to {@link Get} with the exception of - * instantiation. Rather than specifying a single row, an optional startRow - * and stopRow may be defined. If rows are not specified, the Scanner will - * iterate over all rows. + * All operations are identical to {@link Get} with the exception of instantiation. Rather than + * specifying a single row, an optional startRow and stopRow may be defined. If rows are not + * specified, the Scanner will iterate over all rows. *

* To get all columns from all rows of a Table, create an instance with no constraints; use the - * {@link #Scan()} constructor. To constrain the scan to specific column families, - * call {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance. + * {@link #Scan()} constructor. To constrain the scan to specific column families, call + * {@link #addFamily(byte[]) addFamily} for each family to retrieve on your Scan instance. *

- * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn} - * for each column to retrieve. + * To get specific columns, call {@link #addColumn(byte[], byte[]) addColumn} for each column to + * retrieve. *

- * To only retrieve columns within a specific range of version timestamps, - * call {@link #setTimeRange(long, long) setTimeRange}. + * To only retrieve columns within a specific range of version timestamps, call + * {@link #setTimeRange(long, long) setTimeRange}. *

- * To only retrieve columns with a specific timestamp, call - * {@link #setTimeStamp(long) setTimestamp}. + * To only retrieve columns with a specific timestamp, call {@link #setTimeStamp(long) setTimestamp} + * . *

- * To limit the number of versions of each column to be returned, call - * {@link #setMaxVersions(int) setMaxVersions}. + * To limit the number of versions of each column to be returned, call {@link #setMaxVersions(int) + * setMaxVersions}. *

- * To limit the maximum number of values returned for each call to next(), - * call {@link #setBatch(int) setBatch}. + * To limit the maximum number of values returned for each call to next(), call + * {@link #setBatch(int) setBatch}. *

* To add a filter, call {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}. *

- * Expert: To explicitly disable server-side block caching for this scan, - * execute {@link #setCacheBlocks(boolean)}. - *

Note: Usage alters Scan instances. Internally, attributes are updated as the Scan - * runs and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when - * you go to clone a Scan instance or if you go to reuse a created Scan instance; safer is create - * a Scan instance per usage. + * For small scan, it is deprecated in 2.0.0. Now we have a {@link #setLimit(int)} method in Scan + * object which is used to tell RS how many rows we want. If the rows return reaches the limit, the + * RS will close the RegionScanner automatically. And we will also fetch data when openScanner in + * the new implementation, this means we can also finish a scan operation in one rpc call. And we + * have also introduced a {@link #setReadType(ReadType)} method. You can use this method to tell RS + * to use pread explicitly. + *

+ * Expert: To explicitly disable server-side block caching for this scan, execute + * {@link #setCacheBlocks(boolean)}. + *

+ * Note: Usage alters Scan instances. Internally, attributes are updated as the Scan runs + * and if enabled, metrics accumulate in the Scan instance. Be aware this is the case when you go to + * clone a Scan instance or if you go to reuse a created Scan instance; safer is create a Scan + * instance per usage. */ @InterfaceAudience.Public @InterfaceStability.Stable @@ -86,9 +93,9 @@ public class Scan extends Query { private static final String RAW_ATTR = "_raw_"; - private byte [] startRow = HConstants.EMPTY_START_ROW; + private byte[] startRow = HConstants.EMPTY_START_ROW; private boolean includeStartRow = true; - private byte [] stopRow = HConstants.EMPTY_END_ROW; + private byte[] stopRow = HConstants.EMPTY_END_ROW; private boolean includeStopRow = false; private int maxVersions = 1; private int batch = -1; @@ -171,6 +178,16 @@ public class Scan extends Query { */ private long mvccReadPoint = -1L; + /** + * The number of rows we want for this scan. We will terminate the scan if the number of return + * rows reaches this value. + */ + private int limit = -1; + + /** + * Control whether to use pread at server side. + */ + private ReadType readType = ReadType.DEFAULT; /** * Create a Scan operation across all rows. */ @@ -257,6 +274,7 @@ public class Scan extends Query { setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } this.mvccReadPoint = scan.getMvccReadPoint(); + this.limit = scan.getLimit(); } /** @@ -969,37 +987,36 @@ public class Scan extends Query { return attr == null ? false : Bytes.toBoolean(attr); } - - /** * Set whether this scan is a small scan *

- * Small scan should use pread and big scan can use seek + read - * - * seek + read is fast but can cause two problem (1) resource contention (2) - * cause too much network io - * - * [89-fb] Using pread for non-compaction read request - * https://issues.apache.org/jira/browse/HBASE-7266 - * - * On the other hand, if setting it true, we would do - * openScanner,next,closeScanner in one RPC call. It means the better - * performance for small scan. [HBASE-9488]. - * - * Generally, if the scan range is within one data block(64KB), it could be - * considered as a small scan. - * + * Small scan should use pread and big scan can use seek + read seek + read is fast but can cause + * two problem (1) resource contention (2) cause too much network io [89-fb] Using pread for + * non-compaction read request https://issues.apache.org/jira/browse/HBASE-7266 On the other hand, + * if setting it true, we would do openScanner,next,closeScanner in one RPC call. It means the + * better performance for small scan. [HBASE-9488]. Generally, if the scan range is within one + * data block(64KB), it could be considered as a small scan. * @param small + * @deprecated since 2.0.0. Use {@link #setLimit(int)} and {@link #setReadType(ReadType)} instead. + * And for the one rpc optimization, now we will also fetch data when openScanner, and + * if the number of rows reaches the limit then we will close the scanner + * automatically which means we will fall back to one rpc. + * @see #setLimit(int) + * @see #setReadType(ReadType) */ + @Deprecated public Scan setSmall(boolean small) { this.small = small; + this.readType = ReadType.PREAD; return this; } /** * Get whether this scan is a small scan * @return true if small scan + * @deprecated since 2.0.0. See the comment of {@link #setSmall(boolean)} */ + @Deprecated public boolean isSmall() { return small; } @@ -1080,6 +1097,53 @@ public class Scan extends Query { return this; } + /** + * @return the limit of rows for this scan + */ + public int getLimit() { + return limit; + } + + /** + * Set the limit of rows for this scan. We will terminate the scan if the number of returned rows + * reaches this value. + *

+ * This condition will be tested at last, after all other conditions such as stopRow, filter, etc. + *

+ * Can not be used together with batch and allowPartial. + * @param limit the limit of rows for this scan + * @return this + */ + public Scan setLimit(int limit) { + this.limit = limit; + return this; + } + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public enum ReadType { + DEFAULT, STREAM, PREAD + } + + /** + * @return the read type for this scan + */ + public ReadType getReadType() { + return readType; + } + + /** + * Set the read type for this scan. + *

+ * Notice that we may choose to use pread even if you specific {@link ReadType#STREAM} here. For + * example, we will always use pread if this is a get scan. + * @return this + */ + public Scan setReadType(ReadType readType) { + this.readType = readType; + return this; + } + /** * Get the mvcc read point used to open a scanner. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 642fae0a2a1..f867acb57a7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -194,7 +194,7 @@ public class ScannerCallable extends ClientServiceCallable { try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, - this.scanMetrics != null, renew); + this.scanMetrics != null, renew, -1); ScanResponse response = null; response = getStub().scan(getRpcController(), request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index d3898d4c574..51a94ef16c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -814,6 +814,32 @@ public final class ProtobufUtil { return get; } + public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) { + switch (readType) { + case DEFAULT: + return ClientProtos.Scan.ReadType.DEFAULT; + case STREAM: + return ClientProtos.Scan.ReadType.STREAM; + case PREAD: + return ClientProtos.Scan.ReadType.PREAD; + default: + throw new IllegalArgumentException("Unknown ReadType: " + readType); + } + } + + public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) { + switch (readType) { + case DEFAULT: + return Scan.ReadType.DEFAULT; + case STREAM: + return Scan.ReadType.STREAM; + case PREAD: + return Scan.ReadType.PREAD; + default: + throw new IllegalArgumentException("Unknown ReadType: " + readType); + } + } + /** * Convert a client Scan to a protocol buffer Scan * @@ -917,6 +943,9 @@ public final class ProtobufUtil { if (scan.includeStopRow()) { scanBuilder.setIncludeStopRow(true); } + if (scan.getReadType() != Scan.ReadType.DEFAULT) { + scanBuilder.setReadType(toReadType(scan.getReadType())); + } return scanBuilder.build(); } @@ -1015,6 +1044,11 @@ public final class ProtobufUtil { if (proto.hasMvccReadPoint()) { PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint()); } + if (scan.isSmall()) { + scan.setReadType(Scan.ReadType.PREAD); + } else if (proto.hasReadType()) { + scan.setReadType(toReadType(proto.getReadType())); + } return scan; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 7764f658a9d..13ff92e0b36 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLoadStats; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -928,6 +929,32 @@ public final class ProtobufUtil { return get; } + public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) { + switch (readType) { + case DEFAULT: + return ClientProtos.Scan.ReadType.DEFAULT; + case STREAM: + return ClientProtos.Scan.ReadType.STREAM; + case PREAD: + return ClientProtos.Scan.ReadType.PREAD; + default: + throw new IllegalArgumentException("Unknown ReadType: " + readType); + } + } + + public static Scan.ReadType toReadType(ClientProtos.Scan.ReadType readType) { + switch (readType) { + case DEFAULT: + return Scan.ReadType.DEFAULT; + case STREAM: + return Scan.ReadType.STREAM; + case PREAD: + return Scan.ReadType.PREAD; + default: + throw new IllegalArgumentException("Unknown ReadType: " + readType); + } + } + /** * Convert a client Scan to a protocol buffer Scan * @@ -1031,6 +1058,9 @@ public final class ProtobufUtil { if (scan.includeStopRow()) { scanBuilder.setIncludeStopRow(true); } + if (scan.getReadType() != Scan.ReadType.DEFAULT) { + scanBuilder.setReadType(toReadType(scan.getReadType())); + } return scanBuilder.build(); } @@ -1129,6 +1159,11 @@ public final class ProtobufUtil { if (proto.hasMvccReadPoint()) { PackagePrivateFieldAccessor.setMvccReadPoint(scan, proto.getMvccReadPoint()); } + if (scan.isSmall()) { + scan.setReadType(Scan.ReadType.PREAD); + } else if (proto.hasReadType()) { + scan.setReadType(toReadType(proto.getReadType())); + } return scan; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index fd08d9884f1..8de9ad84069 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -462,11 +462,10 @@ public final class RequestConverter { * @return a scan request * @throws IOException */ - public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan, - final int numberOfRows, final boolean closeScanner) throws IOException { + public static ScanRequest buildScanRequest(byte[] regionName, Scan scan, int numberOfRows, + boolean closeScanner) throws IOException { ScanRequest.Builder builder = ScanRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); + RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); builder.setRegion(region); @@ -474,19 +473,21 @@ public final class RequestConverter { builder.setClientHandlesPartials(true); builder.setClientHandlesHeartbeats(true); builder.setTrackScanMetrics(scan.isScanMetricsEnabled()); + if (scan.getLimit() > 0) { + builder.setLimitOfRows(scan.getLimit()); + } return builder.build(); } /** * Create a protocol buffer ScanRequest for a scanner id - * * @param scannerId * @param numberOfRows * @param closeScanner * @return a scan request */ - public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final boolean trackMetrics) { + public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner, + boolean trackMetrics) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -499,16 +500,14 @@ public final class RequestConverter { /** * Create a protocol buffer ScanRequest for a scanner id - * * @param scannerId * @param numberOfRows * @param closeScanner * @param nextCallSeq * @return a scan request */ - public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics, - final boolean renew) { + public static ScanRequest buildScanRequest(long scannerId, int numberOfRows, boolean closeScanner, + long nextCallSeq, boolean trackMetrics, boolean renew, int limitOfRows) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -518,6 +517,9 @@ public final class RequestConverter { builder.setClientHandlesHeartbeats(true); builder.setTrackScanMetrics(trackMetrics); builder.setRenew(renew); + if (limitOfRows > 0) { + builder.setLimitOfRows(limitOfRows); + } return builder.build(); } diff --git a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java index ef44295c1df..a6f0f43b756 100644 --- a/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java +++ b/hbase-protocol-shaded/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/generated/ClientProtos.java @@ -14487,13 +14487,13 @@ public final class ClientProtos { boolean getLoadColumnFamiliesOnDemand(); /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - boolean hasSmall(); + @java.lang.Deprecated boolean hasSmall(); /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - boolean getSmall(); + @java.lang.Deprecated boolean getSmall(); /** * optional bool reversed = 15 [default = false]; @@ -14581,6 +14581,15 @@ public final class ClientProtos { * optional bool include_stop_row = 22 [default = false]; */ boolean getIncludeStopRow(); + + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + boolean hasReadType(); + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType(); } /** *

@@ -14624,6 +14633,7 @@ public final class ClientProtos {
       mvccReadPoint_ = 0L;
       includeStartRow_ = true;
       includeStopRow_ = false;
+      readType_ = 0;
     }
 
     @java.lang.Override
@@ -14798,6 +14808,17 @@ public final class ClientProtos {
               includeStopRow_ = input.readBool();
               break;
             }
+            case 184: {
+              int rawValue = input.readEnum();
+              org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType value = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(rawValue);
+              if (value == null) {
+                unknownFields.mergeVarintField(23, rawValue);
+              } else {
+                bitField0_ |= 0x00080000;
+                readType_ = rawValue;
+              }
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14831,6 +14852,105 @@ public final class ClientProtos {
               org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.class, org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.Builder.class);
     }
 
+    /**
+     * Protobuf enum {@code hbase.pb.Scan.ReadType}
+     */
+    public enum ReadType
+        implements org.apache.hadoop.hbase.shaded.com.google.protobuf.ProtocolMessageEnum {
+      /**
+       * DEFAULT = 0;
+       */
+      DEFAULT(0),
+      /**
+       * STREAM = 1;
+       */
+      STREAM(1),
+      /**
+       * PREAD = 2;
+       */
+      PREAD(2),
+      ;
+
+      /**
+       * DEFAULT = 0;
+       */
+      public static final int DEFAULT_VALUE = 0;
+      /**
+       * STREAM = 1;
+       */
+      public static final int STREAM_VALUE = 1;
+      /**
+       * PREAD = 2;
+       */
+      public static final int PREAD_VALUE = 2;
+
+
+      public final int getNumber() {
+        return value;
+      }
+
+      /**
+       * @deprecated Use {@link #forNumber(int)} instead.
+       */
+      @java.lang.Deprecated
+      public static ReadType valueOf(int value) {
+        return forNumber(value);
+      }
+
+      public static ReadType forNumber(int value) {
+        switch (value) {
+          case 0: return DEFAULT;
+          case 1: return STREAM;
+          case 2: return PREAD;
+          default: return null;
+        }
+      }
+
+      public static org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static final org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap<
+          ReadType> internalValueMap =
+            new org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.EnumLiteMap() {
+              public ReadType findValueByNumber(int number) {
+                return ReadType.forNumber(number);
+              }
+            };
+
+      public final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumValueDescriptor
+          getValueDescriptor() {
+        return getDescriptor().getValues().get(ordinal());
+      }
+      public final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptorForType() {
+        return getDescriptor();
+      }
+      public static final org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumDescriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.getDescriptor().getEnumTypes().get(0);
+      }
+
+      private static final ReadType[] VALUES = values();
+
+      public static ReadType valueOf(
+          org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+        if (desc.getType() != getDescriptor()) {
+          throw new java.lang.IllegalArgumentException(
+            "EnumValueDescriptor is not for this type.");
+        }
+        return VALUES[desc.getIndex()];
+      }
+
+      private final int value;
+
+      private ReadType(int value) {
+        this.value = value;
+      }
+
+      // @@protoc_insertion_point(enum_scope:hbase.pb.Scan.ReadType)
+    }
+
     private int bitField0_;
     public static final int COLUMN_FIELD_NUMBER = 1;
     private java.util.List column_;
@@ -15090,15 +15210,15 @@ public final class ClientProtos {
     public static final int SMALL_FIELD_NUMBER = 14;
     private boolean small_;
     /**
-     * optional bool small = 14;
+     * optional bool small = 14 [deprecated = true];
      */
-    public boolean hasSmall() {
+    @java.lang.Deprecated public boolean hasSmall() {
       return ((bitField0_ & 0x00000800) == 0x00000800);
     }
     /**
-     * optional bool small = 14;
+     * optional bool small = 14 [deprecated = true];
      */
-    public boolean getSmall() {
+    @java.lang.Deprecated public boolean getSmall() {
       return small_;
     }
 
@@ -15243,6 +15363,22 @@ public final class ClientProtos {
       return includeStopRow_;
     }
 
+    public static final int READTYPE_FIELD_NUMBER = 23;
+    private int readType_;
+    /**
+     * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+     */
+    public boolean hasReadType() {
+      return ((bitField0_ & 0x00080000) == 0x00080000);
+    }
+    /**
+     * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+     */
+    public org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType result = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(readType_);
+      return result == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT : result;
+    }
+
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
       byte isInitialized = memoizedIsInitialized;
@@ -15345,6 +15481,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00040000) == 0x00040000)) {
         output.writeBool(22, includeStopRow_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        output.writeEnum(23, readType_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -15441,6 +15580,10 @@ public final class ClientProtos {
         size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
           .computeBoolSize(22, includeStopRow_);
       }
+      if (((bitField0_ & 0x00080000) == 0x00080000)) {
+        size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream
+          .computeEnumSize(23, readType_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -15558,6 +15701,10 @@ public final class ClientProtos {
         result = result && (getIncludeStopRow()
             == other.getIncludeStopRow());
       }
+      result = result && (hasReadType() == other.hasReadType());
+      if (hasReadType()) {
+        result = result && readType_ == other.readType_;
+      }
       result = result && unknownFields.equals(other.unknownFields);
       return result;
     }
@@ -15666,6 +15813,10 @@ public final class ClientProtos {
         hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashBoolean(
             getIncludeStopRow());
       }
+      if (hasReadType()) {
+        hash = (37 * hash) + READTYPE_FIELD_NUMBER;
+        hash = (53 * hash) + readType_;
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -15863,6 +16014,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00100000);
         includeStopRow_ = false;
         bitField0_ = (bitField0_ & ~0x00200000);
+        readType_ = 0;
+        bitField0_ = (bitField0_ & ~0x00400000);
         return this;
       }
 
@@ -15998,6 +16151,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00040000;
         }
         result.includeStopRow_ = includeStopRow_;
+        if (((from_bitField0_ & 0x00400000) == 0x00400000)) {
+          to_bitField0_ |= 0x00080000;
+        }
+        result.readType_ = readType_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -16175,6 +16332,9 @@ public final class ClientProtos {
         if (other.hasIncludeStopRow()) {
           setIncludeStopRow(other.getIncludeStopRow());
         }
+        if (other.hasReadType()) {
+          setReadType(other.getReadType());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -17251,30 +17411,30 @@ public final class ClientProtos {
 
       private boolean small_ ;
       /**
-       * optional bool small = 14;
+       * optional bool small = 14 [deprecated = true];
        */
-      public boolean hasSmall() {
+      @java.lang.Deprecated public boolean hasSmall() {
         return ((bitField0_ & 0x00002000) == 0x00002000);
       }
       /**
-       * optional bool small = 14;
+       * optional bool small = 14 [deprecated = true];
        */
-      public boolean getSmall() {
+      @java.lang.Deprecated public boolean getSmall() {
         return small_;
       }
       /**
-       * optional bool small = 14;
+       * optional bool small = 14 [deprecated = true];
        */
-      public Builder setSmall(boolean value) {
+      @java.lang.Deprecated public Builder setSmall(boolean value) {
         bitField0_ |= 0x00002000;
         small_ = value;
         onChanged();
         return this;
       }
       /**
-       * optional bool small = 14;
+       * optional bool small = 14 [deprecated = true];
        */
-      public Builder clearSmall() {
+      @java.lang.Deprecated public Builder clearSmall() {
         bitField0_ = (bitField0_ & ~0x00002000);
         small_ = false;
         onChanged();
@@ -17748,6 +17908,42 @@ public final class ClientProtos {
         onChanged();
         return this;
       }
+
+      private int readType_ = 0;
+      /**
+       * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+       */
+      public boolean hasReadType() {
+        return ((bitField0_ & 0x00400000) == 0x00400000);
+      }
+      /**
+       * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+       */
+      public org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType getReadType() {
+        org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType result = org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(readType_);
+        return result == null ? org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT : result;
+      }
+      /**
+       * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+       */
+      public Builder setReadType(org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Scan.ReadType value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00400000;
+        readType_ = value.getNumber();
+        onChanged();
+        return this;
+      }
+      /**
+       * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT];
+       */
+      public Builder clearReadType() {
+        bitField0_ = (bitField0_ & ~0x00400000);
+        readType_ = 0;
+        onChanged();
+        return this;
+      }
       public final Builder setUnknownFields(
           final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) {
         return super.setUnknownFields(unknownFields);
@@ -17898,6 +18094,23 @@ public final class ClientProtos {
      * optional bool renew = 10 [default = false];
      */
     boolean getRenew();
+
+    /**
+     * 
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + boolean hasLimitOfRows(); + /** + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + int getLimitOfRows(); } /** *
@@ -17930,6 +18143,7 @@ public final class ClientProtos {
       clientHandlesHeartbeats_ = false;
       trackScanMetrics_ = false;
       renew_ = false;
+      limitOfRows_ = 0;
     }
 
     @java.lang.Override
@@ -18026,6 +18240,11 @@ public final class ClientProtos {
               renew_ = input.readBool();
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000400;
+              limitOfRows_ = input.readUInt32();
+              break;
+            }
           }
         }
       } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException e) {
@@ -18213,6 +18432,29 @@ public final class ClientProtos {
       return renew_;
     }
 
+    public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11;
+    private int limitOfRows_;
+    /**
+     * 
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public int getLimitOfRows() { + return limitOfRows_; + } + private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; @@ -18267,6 +18509,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeBool(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeUInt32(11, limitOfRows_); + } unknownFields.writeTo(output); } @@ -18315,6 +18560,10 @@ public final class ClientProtos { size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream .computeBoolSize(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream + .computeUInt32Size(11, limitOfRows_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -18382,6 +18631,11 @@ public final class ClientProtos { result = result && (getRenew() == other.getRenew()); } + result = result && (hasLimitOfRows() == other.hasLimitOfRows()); + if (hasLimitOfRows()) { + result = result && (getLimitOfRows() + == other.getLimitOfRows()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -18440,6 +18694,10 @@ public final class ClientProtos { hash = (53 * hash) + org.apache.hadoop.hbase.shaded.com.google.protobuf.Internal.hashBoolean( getRenew()); } + if (hasLimitOfRows()) { + hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER; + hash = (53 * hash) + getLimitOfRows(); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -18599,6 +18857,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000100); renew_ = false; bitField0_ = (bitField0_ & ~0x00000200); + limitOfRows_ = 0; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -18671,6 +18931,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000200; } result.renew_ = renew_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.limitOfRows_ = limitOfRows_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18743,6 +19007,9 @@ public final class ClientProtos { if (other.hasRenew()) { setRenew(other.getRenew()); } + if (other.hasLimitOfRows()) { + setLimitOfRows(other.getLimitOfRows()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -19272,6 +19539,54 @@ public final class ClientProtos { onChanged(); return this; } + + private int limitOfRows_ ; + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public int getLimitOfRows() { + return limitOfRows_; + } + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public Builder setLimitOfRows(int value) { + bitField0_ |= 0x00000400; + limitOfRows_ = value; + onChanged(); + return this; + } + /** + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ * + * optional uint32 limit_of_rows = 11 [default = 0]; + */ + public Builder clearLimitOfRows() { + bitField0_ = (bitField0_ & ~0x00000400); + limitOfRows_ = 0; + onChanged(); + return this; + } public final Builder setUnknownFields( final org.apache.hadoop.hbase.shaded.com.google.protobuf.UnknownFieldSet unknownFields) { return super.setUnknownFields(unknownFields); @@ -40834,7 +41149,7 @@ public final class ClientProtos { "tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" + "_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" + "\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " + - "\001(\010\"\233\005\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + + "\001(\010\"\203\006\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + "Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" + "eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" + "w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" + @@ -40843,104 +41158,108 @@ public final class ClientProtos { "cks\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017m" + "ax_result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(" + "\r\022\024\n\014store_offset\030\014 \001(\r\022&\n\036load_column_f" + - "amilies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027" + - "\n\010reversed\030\017 \001(\010:\005false\0222\n\013consistency\030\020" + - " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" + - "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " + - "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + - "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024", - " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" + - "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" + - "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" + - "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" + - "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" + - "\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext_ca" + - "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" + - "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" + - "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " + - "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per", - "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" + - "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" + - "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" + - "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" + - "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" + - "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" + - ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" + - ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" + - " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" + - "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq", - "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" + - "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" + - "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " + - "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" + - "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" + - "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" + - "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" + - "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" + - "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" + - ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re", - "gionSpecifier\"-\n\027PrepareBulkLoadResponse" + - "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" + - "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" + - "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" + - "BulkLoadResponse\"a\n\026CoprocessorServiceCa" + - "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" + - "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" + - "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" + - ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" + - "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.", - "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" + - ".CoprocessorServiceCall\"o\n\032CoprocessorSe" + - "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" + - ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." + - "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" + - "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" + - "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" + - "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" + - "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" + - "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002", - " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" + - "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" + - "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" + - "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" + - "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" + - "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" + - "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" + - " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" + - "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" + - "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co", - "processorServiceResult\0220\n\tloadStats\030\005 \001(" + - "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" + - "onActionResult\0226\n\021resultOrException\030\001 \003(" + - "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" + - "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" + - "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" + - ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" + - "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" + - "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" + - "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc", - "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." + - "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" + - "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" + - "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." + - "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" + - "b.MutateRequest\032\030.hbase.pb.MutateRespons" + - "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" + - ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" + - "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" + - "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .", - "hbase.pb.PrepareBulkLoadRequest\032!.hbase." + - "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" + - "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" + - "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" + - "ecService\022#.hbase.pb.CoprocessorServiceR" + - "equest\032$.hbase.pb.CoprocessorServiceResp" + - "onse\022d\n\027ExecRegionServerService\022#.hbase." + - "pb.CoprocessorServiceRequest\032$.hbase.pb." + - "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" + - "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp", - "onseBI\n1org.apache.hadoop.hbase.shaded.p" + - "rotobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "amilies_on_demand\030\r \001(\010\022\021\n\005small\030\016 \001(\010B\002" + + "\030\001\022\027\n\010reversed\030\017 \001(\010:\005false\0222\n\013consisten" + + "cy\030\020 \001(\0162\025.hbase.pb.Consistency:\006STRONG\022" + + "\017\n\007caching\030\021 \001(\r\022\035\n\025allow_partial_result" + + "s\030\022 \001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.p" + + "b.ColumnFamilyTimeRange\022\032\n\017mvcc_read_poi", + "nt\030\024 \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004" + + "true\022\037\n\020include_stop_row\030\026 \001(\010:\005false\0222\n" + + "\010readType\030\027 \001(\0162\027.hbase.pb.Scan.ReadType" + + ":\007DEFAULT\".\n\010ReadType\022\013\n\007DEFAULT\020\000\022\n\n\006ST" + + "REAM\020\001\022\t\n\005PREAD\020\002\"\300\002\n\013ScanRequest\022)\n\006reg" + + "ion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004" + + "scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id" + + "\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_" + + "scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027" + + "client_handles_partials\030\007 \001(\010\022!\n\031client_", + "handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_m" + + "etrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\022\030\n\rli" + + "mit_of_rows\030\013 \001(\r:\0010\"\266\002\n\014ScanResponse\022\030\n" + + "\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 " + + "\001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!" + + "\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005sta" + + "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" + + "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea" + + "rtbeat_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001" + + "(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_read_p", + "oint\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileRequest\022)" + + "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" + + "r\022>\n\013family_path\030\002 \003(\0132).hbase.pb.BulkLo" + + "adHFileRequest.FamilyPath\022\026\n\016assign_seq_" + + "num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.D" + + "elegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tco" + + "py_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016\n\006fa" + + "mily\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFil" + + "eResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" + + "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001", + "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026Pre" + + "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" + + "\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031.h" + + "base.pb.RegionSpecifier\"-\n\027PrepareBulkLo" + + "adResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cleanu" + + "pBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006" + + "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" + + "\031\n\027CleanupBulkLoadResponse\"a\n\026Coprocesso" + + "rServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_nam" + + "e\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030", + "\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005val" + + "ue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031Cop" + + "rocessorServiceRequest\022)\n\006region\030\001 \002(\0132\031" + + ".hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132" + + " .hbase.pb.CoprocessorServiceCall\"o\n\032Cop" + + "rocessorServiceResponse\022)\n\006region\030\001 \002(\0132" + + "\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 \002(" + + "\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n" + + "\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.p" + + "b.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.", + "Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.Cop" + + "rocessorServiceCall\"k\n\014RegionAction\022)\n\006r" + + "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016" + + "\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.p" + + "b.Action\"c\n\017RegionLoadStats\022\027\n\014memstoreL" + + "oad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035" + + "\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024MultiRe" + + "gionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.pb" + + ".RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.p" + + "b.RegionLoadStats\"\336\001\n\021ResultOrException\022", + "\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.p" + + "b.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb.N" + + "ameBytesPair\022:\n\016service_result\030\004 \001(\0132\".h" + + "base.pb.CoprocessorServiceResult\0220\n\tload" + + "Stats\030\005 \001(\0132\031.hbase.pb.RegionLoadStatsB\002" + + "\030\001\"x\n\022RegionActionResult\0226\n\021resultOrExce" + + "ption\030\001 \003(\0132\033.hbase.pb.ResultOrException" + + "\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameBytes" + + "Pair\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003" + + "(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGroup", + "\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Con" + + "dition\"\226\001\n\rMultiResponse\0228\n\022regionAction" + + "Result\030\001 \003(\0132\034.hbase.pb.RegionActionResu" + + "lt\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatistic" + + "s\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStats*" + + "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" + + "2\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get" + + "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" + + "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" + + "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque", + "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" + + "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" + + "ase.pb.BulkLoadHFileResponse\022V\n\017PrepareB" + + "ulkLoad\022 .hbase.pb.PrepareBulkLoadReques" + + "t\032!.hbase.pb.PrepareBulkLoadResponse\022V\n\017" + + "CleanupBulkLoad\022 .hbase.pb.CleanupBulkLo" + + "adRequest\032!.hbase.pb.CleanupBulkLoadResp" + + "onse\022X\n\013ExecService\022#.hbase.pb.Coprocess" + + "orServiceRequest\032$.hbase.pb.CoprocessorS" + + "erviceResponse\022d\n\027ExecRegionServerServic", + "e\022#.hbase.pb.CoprocessorServiceRequest\032$" + + ".hbase.pb.CoprocessorServiceResponse\0228\n\005" + + "Multi\022\026.hbase.pb.MultiRequest\032\027.hbase.pb" + + ".MultiResponseBI\n1org.apache.hadoop.hbas" + + "e.shaded.protobuf.generatedB\014ClientProto" + + "sH\001\210\001\001\240\001\001" }; org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -41042,13 +41361,13 @@ public final class ClientProtos { internal_static_hbase_pb_Scan_fieldAccessorTable = new org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_hbase_pb_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", "ReadType", }); internal_static_hbase_pb_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new org.apache.hadoop.hbase.shaded.com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_hbase_pb_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", }); internal_static_hbase_pb_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 2793b895202..e5a10b0ce6f 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -249,7 +249,7 @@ message Scan { optional uint32 store_limit = 11; optional uint32 store_offset = 12; optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */ - optional bool small = 14; + optional bool small = 14 [deprecated = true]; optional bool reversed = 15 [default = false]; optional Consistency consistency = 16 [default = STRONG]; optional uint32 caching = 17; @@ -258,6 +258,13 @@ message Scan { optional uint64 mvcc_read_point = 20 [default = 0]; optional bool include_start_row = 21 [default = true]; optional bool include_stop_row = 22 [default = false]; + + enum ReadType { + DEFAULT = 0; + STREAM = 1; + PREAD = 2; + } + optional ReadType readType = 23 [default = DEFAULT]; } /** @@ -282,6 +289,8 @@ message ScanRequest { optional bool client_handles_heartbeats = 8; optional bool track_scan_metrics = 9; optional bool renew = 10 [default = false]; + // if we have returned limit_of_rows rows to client, then close the scanner. + optional uint32 limit_of_rows = 11 [default = 0]; } /** diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 087576c7065..a550f8561dc 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -14145,15 +14145,15 @@ public final class ClientProtos { */ boolean getLoadColumnFamiliesOnDemand(); - // optional bool small = 14; + // optional bool small = 14 [deprecated = true]; /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - boolean hasSmall(); + @java.lang.Deprecated boolean hasSmall(); /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - boolean getSmall(); + @java.lang.Deprecated boolean getSmall(); // optional bool reversed = 15 [default = false]; /** @@ -14249,6 +14249,16 @@ public final class ClientProtos { * optional bool include_stop_row = 22 [default = false]; */ boolean getIncludeStopRow(); + + // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + boolean hasReadType(); + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType(); } /** * Protobuf type {@code hbase.pb.Scan} @@ -14453,6 +14463,17 @@ public final class ClientProtos { includeStopRow_ = input.readBool(); break; } + case 184: { + int rawValue = input.readEnum(); + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType value = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(23, rawValue); + } else { + bitField0_ |= 0x00080000; + readType_ = value; + } + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -14501,6 +14522,97 @@ public final class ClientProtos { return PARSER; } + /** + * Protobuf enum {@code hbase.pb.Scan.ReadType} + */ + public enum ReadType + implements com.google.protobuf.ProtocolMessageEnum { + /** + * DEFAULT = 0; + */ + DEFAULT(0, 0), + /** + * STREAM = 1; + */ + STREAM(1, 1), + /** + * PREAD = 2; + */ + PREAD(2, 2), + ; + + /** + * DEFAULT = 0; + */ + public static final int DEFAULT_VALUE = 0; + /** + * STREAM = 1; + */ + public static final int STREAM_VALUE = 1; + /** + * PREAD = 2; + */ + public static final int PREAD_VALUE = 2; + + + public final int getNumber() { return value; } + + public static ReadType valueOf(int value) { + switch (value) { + case 0: return DEFAULT; + case 1: return STREAM; + case 2: return PREAD; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public ReadType findValueByNumber(int number) { + return ReadType.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDescriptor().getEnumTypes().get(0); + } + + private static final ReadType[] VALUES = values(); + + public static ReadType valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private ReadType(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:hbase.pb.Scan.ReadType) + } + private int bitField0_; // repeated .hbase.pb.Column column = 1; public static final int COLUMN_FIELD_NUMBER = 1; @@ -14770,19 +14882,19 @@ public final class ClientProtos { return loadColumnFamiliesOnDemand_; } - // optional bool small = 14; + // optional bool small = 14 [deprecated = true]; public static final int SMALL_FIELD_NUMBER = 14; private boolean small_; /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - public boolean hasSmall() { + @java.lang.Deprecated public boolean hasSmall() { return ((bitField0_ & 0x00000800) == 0x00000800); } /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - public boolean getSmall() { + @java.lang.Deprecated public boolean getSmall() { return small_; } @@ -14934,6 +15046,22 @@ public final class ClientProtos { return includeStopRow_; } + // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + public static final int READTYPE_FIELD_NUMBER = 23; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType readType_; + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + public boolean hasReadType() { + return ((bitField0_ & 0x00080000) == 0x00080000); + } + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType() { + return readType_; + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -14957,6 +15085,7 @@ public final class ClientProtos { mvccReadPoint_ = 0L; includeStartRow_ = true; includeStopRow_ = false; + readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -15060,6 +15189,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00040000) == 0x00040000)) { output.writeBool(22, includeStopRow_); } + if (((bitField0_ & 0x00080000) == 0x00080000)) { + output.writeEnum(23, readType_.getNumber()); + } getUnknownFields().writeTo(output); } @@ -15157,6 +15289,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(22, includeStopRow_); } + if (((bitField0_ & 0x00080000) == 0x00080000)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(23, readType_.getNumber()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -15281,6 +15417,11 @@ public final class ClientProtos { result = result && (getIncludeStopRow() == other.getIncludeStopRow()); } + result = result && (hasReadType() == other.hasReadType()); + if (hasReadType()) { + result = result && + (getReadType() == other.getReadType()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -15382,6 +15523,10 @@ public final class ClientProtos { hash = (37 * hash) + INCLUDE_STOP_ROW_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getIncludeStopRow()); } + if (hasReadType()) { + hash = (37 * hash) + READTYPE_FIELD_NUMBER; + hash = (53 * hash) + hashEnum(getReadType()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -15571,6 +15716,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00100000); includeStopRow_ = false; bitField0_ = (bitField0_ & ~0x00200000); + readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT; + bitField0_ = (bitField0_ & ~0x00400000); return this; } @@ -15710,6 +15857,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00040000; } result.includeStopRow_ = includeStopRow_; + if (((from_bitField0_ & 0x00400000) == 0x00400000)) { + to_bitField0_ |= 0x00080000; + } + result.readType_ = readType_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -15861,6 +16012,9 @@ public final class ClientProtos { if (other.hasIncludeStopRow()) { setIncludeStopRow(other.getIncludeStopRow()); } + if (other.hasReadType()) { + setReadType(other.getReadType()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -16945,33 +17099,33 @@ public final class ClientProtos { return this; } - // optional bool small = 14; + // optional bool small = 14 [deprecated = true]; private boolean small_ ; /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - public boolean hasSmall() { + @java.lang.Deprecated public boolean hasSmall() { return ((bitField0_ & 0x00002000) == 0x00002000); } /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - public boolean getSmall() { + @java.lang.Deprecated public boolean getSmall() { return small_; } /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - public Builder setSmall(boolean value) { + @java.lang.Deprecated public Builder setSmall(boolean value) { bitField0_ |= 0x00002000; small_ = value; onChanged(); return this; } /** - * optional bool small = 14; + * optional bool small = 14 [deprecated = true]; */ - public Builder clearSmall() { + @java.lang.Deprecated public Builder clearSmall() { bitField0_ = (bitField0_ & ~0x00002000); small_ = false; onChanged(); @@ -17452,6 +17606,42 @@ public final class ClientProtos { return this; } + // optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + private org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT; + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + public boolean hasReadType() { + return ((bitField0_ & 0x00400000) == 0x00400000); + } + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType getReadType() { + return readType_; + } + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + public Builder setReadType(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00400000; + readType_ = value; + onChanged(); + return this; + } + /** + * optional .hbase.pb.Scan.ReadType readType = 23 [default = DEFAULT]; + */ + public Builder clearReadType() { + bitField0_ = (bitField0_ & ~0x00400000); + readType_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.ReadType.DEFAULT; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.Scan) } @@ -17573,6 +17763,24 @@ public final class ClientProtos { * optional bool renew = 10 [default = false]; */ boolean getRenew(); + + // optional uint32 limit_of_rows = 11 [default = 0]; + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + boolean hasLimitOfRows(); + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + int getLimitOfRows(); } /** * Protobuf type {@code hbase.pb.ScanRequest} @@ -17704,6 +17912,11 @@ public final class ClientProtos { renew_ = input.readBool(); break; } + case 88: { + bitField0_ |= 0x00000400; + limitOfRows_ = input.readUInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -17916,6 +18129,30 @@ public final class ClientProtos { return renew_; } + // optional uint32 limit_of_rows = 11 [default = 0]; + public static final int LIMIT_OF_ROWS_FIELD_NUMBER = 11; + private int limitOfRows_; + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+     * if we have returned limit_of_rows rows to client, then close the scanner.
+     * 
+ */ + public int getLimitOfRows() { + return limitOfRows_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -17927,6 +18164,7 @@ public final class ClientProtos { clientHandlesHeartbeats_ = false; trackScanMetrics_ = false; renew_ = false; + limitOfRows_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -17982,6 +18220,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeBool(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeUInt32(11, limitOfRows_); + } getUnknownFields().writeTo(output); } @@ -18031,6 +18272,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(10, renew_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(11, limitOfRows_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18104,6 +18349,11 @@ public final class ClientProtos { result = result && (getRenew() == other.getRenew()); } + result = result && (hasLimitOfRows() == other.hasLimitOfRows()); + if (hasLimitOfRows()) { + result = result && (getLimitOfRows() + == other.getLimitOfRows()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18157,6 +18407,10 @@ public final class ClientProtos { hash = (37 * hash) + RENEW_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getRenew()); } + if (hasLimitOfRows()) { + hash = (37 * hash) + LIMIT_OF_ROWS_FIELD_NUMBER; + hash = (53 * hash) + getLimitOfRows(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18309,6 +18563,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000100); renew_ = false; bitField0_ = (bitField0_ & ~0x00000200); + limitOfRows_ = 0; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -18385,6 +18641,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000200; } result.renew_ = renew_; + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.limitOfRows_ = limitOfRows_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18431,6 +18691,9 @@ public final class ClientProtos { if (other.hasRenew()) { setRenew(other.getRenew()); } + if (other.hasLimitOfRows()) { + setLimitOfRows(other.getLimitOfRows()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -18968,6 +19231,55 @@ public final class ClientProtos { return this; } + // optional uint32 limit_of_rows = 11 [default = 0]; + private int limitOfRows_ ; + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public boolean hasLimitOfRows() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public int getLimitOfRows() { + return limitOfRows_; + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public Builder setLimitOfRows(int value) { + bitField0_ |= 0x00000400; + limitOfRows_ = value; + onChanged(); + return this; + } + /** + * optional uint32 limit_of_rows = 11 [default = 0]; + * + *
+       * if we have returned limit_of_rows rows to client, then close the scanner.
+       * 
+ */ + public Builder clearLimitOfRows() { + bitField0_ = (bitField0_ & ~0x00000400); + limitOfRows_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ScanRequest) } @@ -39912,7 +40224,7 @@ public final class ClientProtos { "tion\030\003 \001(\0132\023.hbase.pb.Condition\022\023\n\013nonce" + "_group\030\004 \001(\004\"E\n\016MutateResponse\022 \n\006result" + "\030\001 \001(\0132\020.hbase.pb.Result\022\021\n\tprocessed\030\002 " + - "\001(\010\"\233\005\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + + "\001(\010\"\203\006\n\004Scan\022 \n\006column\030\001 \003(\0132\020.hbase.pb." + "Column\022*\n\tattribute\030\002 \003(\0132\027.hbase.pb.Nam" + "eBytesPair\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_ro" + "w\030\004 \001(\014\022 \n\006filter\030\005 \001(\0132\020.hbase.pb.Filte" + @@ -39921,104 +40233,108 @@ public final class ClientProtos { "cks\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017m" + "ax_result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(" + "\r\022\024\n\014store_offset\030\014 \001(\r\022&\n\036load_column_f" + - "amilies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027" + - "\n\010reversed\030\017 \001(\010:\005false\0222\n\013consistency\030\020" + - " \001(\0162\025.hbase.pb.Consistency:\006STRONG\022\017\n\007c" + - "aching\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 " + - "\001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.Co" + - "lumnFamilyTimeRange\022\032\n\017mvcc_read_point\030\024", - " \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004true" + - "\022\037\n\020include_stop_row\030\026 \001(\010:\005false\"\246\002\n\013Sc" + - "anRequest\022)\n\006region\030\001 \001(\0132\031.hbase.pb.Reg" + - "ionSpecifier\022\034\n\004scan\030\002 \001(\0132\016.hbase.pb.Sc" + - "an\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_of_rows" + - "\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rnext_ca" + - "ll_seq\030\006 \001(\004\022\037\n\027client_handles_partials\030" + - "\007 \001(\010\022!\n\031client_handles_heartbeats\030\010 \001(\010" + - "\022\032\n\022track_scan_metrics\030\t \001(\010\022\024\n\005renew\030\n " + - "\001(\010:\005false\"\266\002\n\014ScanResponse\022\030\n\020cells_per", - "_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014mor" + - "e_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030" + - "\005 \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037" + - "\n\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_" + - "results_in_region\030\010 \001(\010\022\031\n\021heartbeat_mes" + - "sage\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase" + - ".pb.ScanMetrics\022\032\n\017mvcc_read_point\030\013 \001(\004" + - ":\0010\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001" + - " \002(\0132\031.hbase.pb.RegionSpecifier\022>\n\013famil" + - "y_path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReq", - "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022" + - "+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationT" + - "oken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 " + - "\001(\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" + - "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" + - "\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nid" + - "entifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind" + - "\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLo" + - "adRequest\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb" + - ".TableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Re", - "gionSpecifier\"-\n\027PrepareBulkLoadResponse" + - "\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadR" + - "equest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001" + - "(\0132\031.hbase.pb.RegionSpecifier\"\031\n\027Cleanup" + - "BulkLoadResponse\"a\n\026CoprocessorServiceCa" + - "ll\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n" + - "\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030C" + - "oprocessorServiceResult\022&\n\005value\030\001 \001(\0132\027" + - ".hbase.pb.NameBytesPair\"v\n\031CoprocessorSe" + - "rviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.", - "RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb" + - ".CoprocessorServiceCall\"o\n\032CoprocessorSe" + - "rviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb" + - ".RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase." + - "pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001" + - "(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutation" + - "Proto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014ser" + - "vice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSe" + - "rviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(" + - "\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002", - " \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c" + - "\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:" + - "\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compacti" + - "onPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadSt" + - "ats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpe" + - "cifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLo" + - "adStats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001" + - " \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*" + - "\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesPa" + - "ir\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Co", - "processorServiceResult\0220\n\tloadStats\030\005 \001(" + - "\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Regi" + - "onActionResult\0226\n\021resultOrException\030\001 \003(" + - "\0132\033.hbase.pb.ResultOrException\022*\n\texcept" + - "ion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mu" + - "ltiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase" + - ".pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\t" + - "condition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n" + - "\rMultiResponse\0228\n\022regionActionResult\030\001 \003" + - "(\0132\034.hbase.pb.RegionActionResult\022\021\n\tproc", - "essed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036." + - "hbase.pb.MultiRegionLoadStats*\'\n\013Consist" + - "ency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClien" + - "tService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025." + - "hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.p" + - "b.MutateRequest\032\030.hbase.pb.MutateRespons" + - "e\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase" + - ".pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbas" + - "e.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bul" + - "kLoadHFileResponse\022V\n\017PrepareBulkLoad\022 .", - "hbase.pb.PrepareBulkLoadRequest\032!.hbase." + - "pb.PrepareBulkLoadResponse\022V\n\017CleanupBul" + - "kLoad\022 .hbase.pb.CleanupBulkLoadRequest\032" + - "!.hbase.pb.CleanupBulkLoadResponse\022X\n\013Ex" + - "ecService\022#.hbase.pb.CoprocessorServiceR" + - "equest\032$.hbase.pb.CoprocessorServiceResp" + - "onse\022d\n\027ExecRegionServerService\022#.hbase." + - "pb.CoprocessorServiceRequest\032$.hbase.pb." + - "CoprocessorServiceResponse\0228\n\005Multi\022\026.hb" + - "ase.pb.MultiRequest\032\027.hbase.pb.MultiResp", - "onseBB\n*org.apache.hadoop.hbase.protobuf" + - ".generatedB\014ClientProtosH\001\210\001\001\240\001\001" + "amilies_on_demand\030\r \001(\010\022\021\n\005small\030\016 \001(\010B\002" + + "\030\001\022\027\n\010reversed\030\017 \001(\010:\005false\0222\n\013consisten" + + "cy\030\020 \001(\0162\025.hbase.pb.Consistency:\006STRONG\022" + + "\017\n\007caching\030\021 \001(\r\022\035\n\025allow_partial_result" + + "s\030\022 \001(\010\0226\n\rcf_time_range\030\023 \003(\0132\037.hbase.p" + + "b.ColumnFamilyTimeRange\022\032\n\017mvcc_read_poi", + "nt\030\024 \001(\004:\0010\022\037\n\021include_start_row\030\025 \001(\010:\004" + + "true\022\037\n\020include_stop_row\030\026 \001(\010:\005false\0222\n" + + "\010readType\030\027 \001(\0162\027.hbase.pb.Scan.ReadType" + + ":\007DEFAULT\".\n\010ReadType\022\013\n\007DEFAULT\020\000\022\n\n\006ST" + + "REAM\020\001\022\t\n\005PREAD\020\002\"\300\002\n\013ScanRequest\022)\n\006reg" + + "ion\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004" + + "scan\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id" + + "\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_" + + "scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027" + + "client_handles_partials\030\007 \001(\010\022!\n\031client_", + "handles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_m" + + "etrics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\022\030\n\rli" + + "mit_of_rows\030\013 \001(\r:\0010\"\266\002\n\014ScanResponse\022\030\n" + + "\020cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 " + + "\001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!" + + "\n\007results\030\005 \003(\0132\020.hbase.pb.Result\022\r\n\005sta" + + "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" + + "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea" + + "rtbeat_message\030\t \001(\010\022+\n\014scan_metrics\030\n \001" + + "(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mvcc_read_p", + "oint\030\013 \001(\004:\0010\"\240\002\n\024BulkLoadHFileRequest\022)" + + "\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifie" + + "r\022>\n\013family_path\030\002 \003(\0132).hbase.pb.BulkLo" + + "adHFileRequest.FamilyPath\022\026\n\016assign_seq_" + + "num\030\003 \001(\010\022+\n\010fs_token\030\004 \001(\0132\031.hbase.pb.D" + + "elegationToken\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tco" + + "py_file\030\006 \001(\010:\005false\032*\n\nFamilyPath\022\016\n\006fa" + + "mily\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFil" + + "eResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" + + "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001", + "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026Pre" + + "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" + + "\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031.h" + + "base.pb.RegionSpecifier\"-\n\027PrepareBulkLo" + + "adResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Cleanu" + + "pBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006" + + "region\030\002 \001(\0132\031.hbase.pb.RegionSpecifier\"" + + "\031\n\027CleanupBulkLoadResponse\"a\n\026Coprocesso" + + "rServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_nam" + + "e\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030", + "\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005val" + + "ue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031Cop" + + "rocessorServiceRequest\022)\n\006region\030\001 \002(\0132\031" + + ".hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(\0132" + + " .hbase.pb.CoprocessorServiceCall\"o\n\032Cop" + + "rocessorServiceResponse\022)\n\006region\030\001 \002(\0132" + + "\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 \002(" + + "\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022\r\n" + + "\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.p" + + "b.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.", + "Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.Cop" + + "rocessorServiceCall\"k\n\014RegionAction\022)\n\006r" + + "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022\016" + + "\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.p" + + "b.Action\"c\n\017RegionLoadStats\022\027\n\014memstoreL" + + "oad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035" + + "\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024MultiRe" + + "gionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.pb" + + ".RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.p" + + "b.RegionLoadStats\"\336\001\n\021ResultOrException\022", + "\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.p" + + "b.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb.N" + + "ameBytesPair\022:\n\016service_result\030\004 \001(\0132\".h" + + "base.pb.CoprocessorServiceResult\0220\n\tload" + + "Stats\030\005 \001(\0132\031.hbase.pb.RegionLoadStatsB\002" + + "\030\001\"x\n\022RegionActionResult\0226\n\021resultOrExce" + + "ption\030\001 \003(\0132\033.hbase.pb.ResultOrException" + + "\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameBytes" + + "Pair\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003" + + "(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGroup", + "\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Con" + + "dition\"\226\001\n\rMultiResponse\0228\n\022regionAction" + + "Result\030\001 \003(\0132\034.hbase.pb.RegionActionResu" + + "lt\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatistic" + + "s\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStats*" + + "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" + + "2\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get" + + "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" + + "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" + + "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque", + "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" + + "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb" + + "ase.pb.BulkLoadHFileResponse\022V\n\017PrepareB" + + "ulkLoad\022 .hbase.pb.PrepareBulkLoadReques" + + "t\032!.hbase.pb.PrepareBulkLoadResponse\022V\n\017" + + "CleanupBulkLoad\022 .hbase.pb.CleanupBulkLo" + + "adRequest\032!.hbase.pb.CleanupBulkLoadResp" + + "onse\022X\n\013ExecService\022#.hbase.pb.Coprocess" + + "orServiceRequest\032$.hbase.pb.CoprocessorS" + + "erviceResponse\022d\n\027ExecRegionServerServic", + "e\022#.hbase.pb.CoprocessorServiceRequest\032$" + + ".hbase.pb.CoprocessorServiceResponse\0228\n\005" + + "Multi\022\026.hbase.pb.MultiRequest\032\027.hbase.pb" + + ".MultiResponseBB\n*org.apache.hadoop.hbas" + + "e.protobuf.generatedB\014ClientProtosH\001\210\001\001\240" + + "\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -40108,13 +40424,13 @@ public final class ClientProtos { internal_static_hbase_pb_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", }); + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", "Reversed", "Consistency", "Caching", "AllowPartialResults", "CfTimeRange", "MvccReadPoint", "IncludeStartRow", "IncludeStopRow", "ReadType", }); internal_static_hbase_pb_ScanRequest_descriptor = getDescriptor().getMessageTypes().get(12); internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", "LimitOfRows", }); internal_static_hbase_pb_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index ae932f7d06c..5cf66c2a092 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -249,7 +249,7 @@ message Scan { optional uint32 store_limit = 11; optional uint32 store_offset = 12; optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */ - optional bool small = 14; + optional bool small = 14 [deprecated = true]; optional bool reversed = 15 [default = false]; optional Consistency consistency = 16 [default = STRONG]; optional uint32 caching = 17; @@ -258,6 +258,13 @@ message Scan { optional uint64 mvcc_read_point = 20 [default = 0]; optional bool include_start_row = 21 [default = true]; optional bool include_stop_row = 22 [default = false]; + + enum ReadType { + DEFAULT = 0; + STREAM = 1; + PREAD = 2; + } + optional ReadType readType = 23 [default = DEFAULT]; } /** @@ -282,6 +289,8 @@ message ScanRequest { optional bool client_handles_heartbeats = 8; optional bool track_scan_metrics = 9; optional bool renew = 10 [default = false]; + // if we have returned limit_of_rows rows to client, then close the scanner. + optional uint32 limit_of_rows = 11 [default = 0]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a072dce6c01..9847dfe4412 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1133,6 +1133,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + @VisibleForTesting + public int getScannersCount() { + return scanners.size(); + } + public RegionScanner getScanner(long scannerId) { String scannerIdString = Long.toString(scannerId); @@ -3014,6 +3019,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, RegionScanner scanner = rsh.s; boolean moreResults = true; boolean moreResultsInRegion = true; + // this is the limit of rows for this scan, if we the number of rows reach this value, we will + // close the scanner. + int limitOfRows; + if (request.hasLimitOfRows()) { + limitOfRows = request.getLimitOfRows(); + rows = Math.min(rows, limitOfRows); + } else { + limitOfRows = -1; + } MutableObject lastBlock = new MutableObject(); boolean scannerClosed = false; try { @@ -3046,6 +3060,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // with the old scan implementation where we just ignore the returned results if moreResults // is false. Can remove the isEmpty check after we get rid of the old implementation. moreResults = false; + } else if (limitOfRows > 0 && results.size() >= limitOfRows + && !results.get(results.size() - 1).isPartial()) { + // if we have reached the limit of rows + moreResults = false; } addResults(builder, results, (HBaseRpcController) controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 7e08eca2b01..8c48aefa0d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -127,7 +127,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected Cell lastTop = null; // A flag whether use pread for scan - private boolean scanUsePread = false; + private final boolean scanUsePread; // Indicates whether there was flush during the course of the scan protected volatile boolean flushed = false; // generally we get one file from a flush @@ -168,7 +168,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.useRowColBloom = numCol > 1 || (!get && numCol == 1); this.maxRowSize = scanInfo.getTableMaxRowSize(); - this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread(); + if (get) { + this.scanUsePread = true; + } else { + switch (scan.getReadType()) { + case STREAM: + this.scanUsePread = false; + break; + case PREAD: + this.scanUsePread = true; + break; + default: + this.scanUsePread = scanInfo.isUsePread(); + break; + } + } this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); // Parallel seeking is on if the config allows and more there is more than one store file. if (this.store != null && this.store.getStorefilesCount() > 1) { @@ -348,10 +362,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @return list of scanners to seek */ protected List getScannersNoCompaction() throws IOException { - final boolean isCompaction = false; - boolean usePread = get || scanUsePread; - return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); + return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher, + scan.getStartRow(), scan.getStopRow(), this.readPt)); } /** @@ -803,18 +815,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } protected void resetScannerStack(Cell lastTopKey) throws IOException { - /* When we have the scan object, should we not pass it to getScanners() - * to get a limited set of scanners? We did so in the constructor and we - * could have done it now by storing the scan object from the constructor - */ - - final boolean isCompaction = false; - boolean usePread = get || scanUsePread; + // When we have the scan object, should we not pass it to getScanners() to get a limited set of + // scanners? We did so in the constructor and we could have done it now by storing the scan + // object from the constructor List scanners = null; try { flushLock.lock(); - scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); + scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, + scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); // Clear the current set of flushed store files so that they don't get added again flushedStoreFiles.clear(); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index a1d926d5672..b80efae0d4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -95,6 +95,12 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testScanAll() throws Exception { List results = doScan(createScan()); + // make sure all scanners are closed at RS side + TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .forEach(rs -> assertEquals( + "The scanner count of " + rs.getServerName() + " is " + + rs.getRSRpcServices().getScannersCount(), + 0, rs.getRSRpcServices().getScannersCount())); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> { Result result = results.get(i); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java similarity index 62% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index 3737af2681a..a9a3e4362f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -26,6 +26,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; import java.util.stream.IntStream; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -38,11 +39,20 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { +public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { - @Parameter + @Parameter(0) + public String tableType; + + @Parameter(1) public Supplier getTable; + @Parameter(2) + public String scanType; + + @Parameter(3) + public Supplier scanCreator; + private static RawAsyncTable getRawTable() { return ASYNC_CONN.getRawTable(TABLE_NAME); } @@ -51,24 +61,37 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); } - @Parameters + private static Scan createNormalScan() { + return new Scan(); + } + + // test if we can handle partial result when open scanner. + private static Scan createSmallResultSizeScan() { + return new Scan().setMaxResultSize(1); + } + + @Parameters(name = "{index}: table={0}, scan={2}") public static List params() { - return Arrays.asList(new Supplier[] { TestAsyncTableSmallScan::getRawTable }, - new Supplier[] { TestAsyncTableSmallScan::getTable }); + Supplier rawTable = TestAsyncTableScanAll::getRawTable; + Supplier normalTable = TestAsyncTableScanAll::getTable; + Supplier normalScan = TestAsyncTableScanAll::createNormalScan; + Supplier smallResultSizeScan = TestAsyncTableScanAll::createSmallResultSizeScan; + return Arrays.asList(new Object[] { "raw", rawTable, "normal", normalScan }, + new Object[] { "raw", rawTable, "smallResultSize", smallResultSizeScan }, + new Object[] { "normal", normalTable, "normal", normalScan }, + new Object[] { "normal", normalTable, "smallResultSize", smallResultSizeScan }); } @Test public void testScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); int start = 111; int stop = 888; int limit = 300; - List results = - table - .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true), - limit) - .get(); + List results = getTable.get() + .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start))) + .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit) + .setReadType(ReadType.PREAD)) + .get(); assertEquals(limit, results.size()); IntStream.range(0, limit).forEach(i -> { Result result = results.get(i); @@ -80,14 +103,14 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { @Test public void testReversedScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTableBase table = getTable.get(); int start = 888; int stop = 111; int limit = 300; - List results = table.smallScan( - new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true).setReversed(true), - limit).get(); + List results = getTable.get() + .scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start))) + .withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit) + .setReadType(ReadType.PREAD).setReversed(true)) + .get(); assertEquals(limit, results.size()); IntStream.range(0, limit).forEach(i -> { Result result = results.get(i); @@ -99,11 +122,11 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { @Override protected Scan createScan() { - return new Scan().setSmall(true); + return scanCreator.get(); } @Override protected List doScan(Scan scan) throws Exception { - return getTable.get().smallScan(scan).get(); + return getTable.get().scanAll(scan).get(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 9f3970b9255..a8ef353f300 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -56,11 +56,6 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { return true; } - @Override - public boolean onHeartbeat() { - return true; - } - @Override public synchronized void onError(Throwable error) { finished = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java index 3c5fe27a94f..1b3c4e67105 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestMultiRowRangeFilter.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; @@ -369,7 +370,10 @@ public class TestMultiRowRangeFilter { @Test public void testMultiRowRangeFilterWithExclusive() throws IOException { tableName = TableName.valueOf("testMultiRowRangeFilterWithExclusive"); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); Table ht = TEST_UTIL.createTable(tableName, family, Integer.MAX_VALUE); + ht.setReadRpcTimeout(600000); + ht.setOperationTimeout(6000000); generateRows(numRows, ht, family, qf, value); Scan scan = new Scan();