diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index c4e84481915..abcc26eeb8b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
@@ -31,6 +32,7 @@ import io.netty.util.Timeout;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -202,7 +204,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private ScanResponse resp;
- private int numValidResults;
+ private int numberOfIndividualRows;
// If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
// by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
@@ -219,7 +221,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
// resume is called after suspend, then it is also safe to just reference resp and
// numValidResults after the synchronized block as no one will change it anymore.
ScanResponse localResp;
- int localNumValidResults;
+ int localNumberOfIndividualRows;
synchronized (this) {
if (state == ScanResumerState.INITIALIZED) {
// user calls this method before we call prepare, so just set the state to
@@ -236,9 +238,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
leaseRenewer.cancel();
}
localResp = this.resp;
- localNumValidResults = this.numValidResults;
+ localNumberOfIndividualRows = this.numberOfIndividualRows;
}
- completeOrNext(localResp, localNumValidResults);
+ completeOrNext(localResp, localNumberOfIndividualRows);
}
private void scheduleRenewLeaseTask() {
@@ -258,14 +260,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
// return false if the scan has already been resumed. See the comment above for ScanResumerImpl
// for more details.
- synchronized boolean prepare(ScanResponse resp, int numValidResults) {
+ synchronized boolean prepare(ScanResponse resp, int numberOfIndividualRows) {
if (state == ScanResumerState.RESUMED) {
// user calls resume before we actually suspend the scan, just continue;
return false;
}
state = ScanResumerState.SUSPENDED;
this.resp = resp;
- this.numValidResults = numValidResults;
+ this.numberOfIndividualRows = numberOfIndividualRows;
// if there are no more results in region then the scanner at RS side will be closed
// automatically so we do not need to renew lease.
if (resp.getMoreResultsInRegion()) {
@@ -402,7 +404,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void updateNextStartRowWhenError(Result result) {
nextStartRowWhenError = result.getRow();
- includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
+ includeNextStartRowWhenError = result.hasMoreCellsInRow();
}
private void completeWhenNoMoreResultsInRegion() {
@@ -421,7 +423,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
}
- private void completeOrNext(ScanResponse resp, int numValidResults) {
+ private void completeOrNext(ScanResponse resp, int numIndividualRows) {
if (resp.hasMoreResults() && !resp.getMoreResults()) {
// RS tells us there is no more data for the whole scan
completeNoMoreResults();
@@ -429,10 +431,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
if (scan.getLimit() > 0) {
// The RS should have set the moreResults field in ScanResponse to false when we have reached
- // the limit.
- int limit = scan.getLimit() - numValidResults;
- assert limit > 0;
- scan.setLimit(limit);
+ // the limit, so we add an assert here.
+ int newLimit = scan.getLimit() - numIndividualRows;
+ assert newLimit > 0;
+ scan.setLimit(newLimit);
}
// as in 2.0 this value will always be set
if (!resp.getMoreResultsInRegion()) {
@@ -462,10 +464,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeWhenError(true);
return;
}
-
+ // calculate this before calling onNext as it is free for user to modify the result array in
+ // onNext.
+ int numberOfIndividualRows = numberOfIndividualRows(Arrays.asList(results));
ScanControllerImpl scanController = new ScanControllerImpl();
if (results.length == 0) {
- // if we have nothing to return then this must be a heartbeat message.
+ // if we have nothing to return then just call onHeartbeat.
consumer.onHeartbeat(scanController);
} else {
updateNextStartRowWhenError(results[results.length - 1]);
@@ -482,11 +486,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return;
}
if (state == ScanControllerState.SUSPENDED) {
- if (scanController.resumer.prepare(resp, results.length)) {
+ if (scanController.resumer.prepare(resp, numberOfIndividualRows)) {
return;
}
}
- completeOrNext(resp, results.length);
+ completeOrNext(resp, numberOfIndividualRows);
}
private void call() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 59c52de23cf..47270a72c40 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
+
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -51,8 +54,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* Implements the scanner interface for the HBase client. If there are multiple regions in a table,
* this scanner will iterate through them all.
@@ -405,7 +406,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// If the lastRow is not partial, then we should start from the next row. As now we can
// exclude the start row, the logic here is the same for both normal scan and reversed scan.
// If lastResult is partial then include it, otherwise exclude it.
- scan.withStartRow(lastResult.getRow(), lastResult.isPartial() || scan.getBatch() > 0);
+ scan.withStartRow(lastResult.getRow(), lastResult.hasMoreCellsInRow());
}
if (e instanceof OutOfOrderScannerNextException) {
if (retryAfterOutOfOrderException.isTrue()) {
@@ -496,16 +497,16 @@ public abstract class ClientScanner extends AbstractClientScanner {
remainingResultSize -= estimatedHeapSizeOfResult;
addEstimatedSize(estimatedHeapSizeOfResult);
this.lastResult = rs;
- if (this.lastResult.isPartial() || scan.getBatch() > 0) {
+ if (this.lastResult.hasMoreCellsInRow()) {
updateLastCellLoadedToCache(this.lastResult);
} else {
this.lastCellLoadedToCache = null;
}
}
- if (scan.getLimit() > 0) {
- int limit = scan.getLimit() - resultsToAddToCache.size();
- assert limit >= 0;
- scan.setLimit(limit);
+ if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
+ int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
+ assert newLimit >= 0;
+ scan.setLimit(newLimit);
}
}
if (scanExhausted(values)) {
@@ -620,7 +621,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// In every RPC response there should be at most a single partial result. Furthermore, if
// there is a partial result, it is guaranteed to be in the last position of the array.
Result last = resultsFromServer[resultsFromServer.length - 1];
- Result partial = last.isPartial() ? last : null;
+ Result partial = last.hasMoreCellsInRow() ? last : null;
if (LOG.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
@@ -666,7 +667,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// If the result is not a partial, it is a signal to us that it is the last Result we
// need to form the complete Result client-side
- if (!result.isPartial()) {
+ if (!result.hasMoreCellsInRow()) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
@@ -682,7 +683,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// It's possible that in one response from the server we receive the final partial for
// one row and receive a partial for a different row. Thus, make sure that all Results
// are added to the proper list
- if (result.isPartial()) {
+ if (result.hasMoreCellsInRow()) {
addToPartialResults(result);
} else {
resultsToAddToCache.add(result);
@@ -824,7 +825,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
index++;
}
Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
- return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
+ return Result.create(list, result.getExists(), result.isStale(), result.hasMoreCellsInRow());
}
protected void initCache() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
index 9dfb8f73143..bc79e041b98 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java
@@ -76,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache {
// In every RPC response there should be at most a single partial result. Furthermore, if
// there is a partial result, it is guaranteed to be in the last position of the array.
Result last = results[results.length - 1];
- if (last.isPartial()) {
+ if (last.hasMoreCellsInRow()) {
if (partialResults.isEmpty()) {
partialResults.add(last);
return Arrays.copyOf(results, results.length - 1);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 852ffdcba34..28f0cee4c0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -322,7 +322,7 @@ public final class ConnectionUtils {
return null;
}
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
- result.isStale(), result.mayHaveMoreCellsInRow());
+ result.isStale(), result.hasMoreCellsInRow());
}
// Add a delta to avoid timeout immediately after a retry sleeping.
@@ -381,4 +381,21 @@ public final class ConnectionUtils {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
}
+
+ /**
+ * Count the individual rows for the given result list.
+ *
+ * There are two reason why we need to use this method instead of a simple {@code results.length}.
+ *
+ *
Server may return only part of the whole cells of a row for the last result, and if
+ * allowPartial is true, we will return the array to user directly. We should not count the last
+ * result.
+ *
If this is a batched scan, a row may be split into several results, but they should be
+ * counted as one row. For example, a row with 15 cells will be split into 3 results with 5 cells
+ * each if {@code scan.getBatch()} is 5.
+ *
+ */
+ public static int numberOfIndividualRows(List results) {
+ return (int) results.stream().filter(r -> !r.hasMoreCellsInRow()).count();
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 1fa33b0be42..0c383fce0f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -330,10 +330,6 @@ public class HTable implements Table {
*/
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
- if (scan.getBatch() > 0 && scan.isSmall()) {
- throw new IllegalArgumentException("Small scan should not be used with batching");
- }
-
if (scan.getCaching() <= 0) {
scan.setCaching(scannerCaching);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index fa3d792c68d..7948b654c01 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -376,15 +376,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
}
public void scan(Scan scan, RawScanResultConsumer consumer) {
- if (scan.isSmall() || scan.getLimit() > 0) {
- if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
- consumer.onError(new IllegalArgumentException(
- "Batch and allowPartial is not allowed for small scan or limited scan"));
- }
- }
- scan = setDefaultScanConfig(scan);
- new AsyncClientScanner(scan, consumer, tableName, conn, pauseNs, maxAttempts, scanTimeoutNs,
- readRpcTimeoutNs, startLogErrorsCnt).start();
+ new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
+ maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 6ed87590ecd..232e3d3b136 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -84,10 +84,9 @@ public class Result implements CellScannable, CellScanner {
private boolean stale = false;
/**
- * See {@link #mayHaveMoreCellsInRow()}. And please notice that, The client side implementation
- * should also check for row key change to determine if a Result is the last one for a row.
+ * See {@link #hasMoreCellsInRow()}.
*/
- private boolean mayHaveMoreCellsInRow = false;
+ private boolean hasMoreCellsInRow = false;
// We're not using java serialization. Transient here is just a marker to say
// that this is where we cache row if we're ever asked for it.
private transient byte [] row = null;
@@ -178,7 +177,7 @@ public class Result implements CellScannable, CellScanner {
this.cells = cells;
this.exists = exists;
this.stale = stale;
- this.mayHaveMoreCellsInRow = mayHaveMoreCellsInRow;
+ this.hasMoreCellsInRow = mayHaveMoreCellsInRow;
this.readonly = false;
}
@@ -823,7 +822,7 @@ public class Result implements CellScannable, CellScanner {
// Result1: -1- -2- (2 cells, size limit reached, mark as partial)
// Result2: -3- -4- (2 cells, size limit reached, mark as partial)
// Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
- if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
+ if (i != (partialResults.size() - 1) && !r.hasMoreCellsInRow()) {
throw new IOException(
"Cannot form complete result. Result is missing partial flag. " +
"Partial Results: " + partialResults);
@@ -910,28 +909,26 @@ public class Result implements CellScannable, CellScanner {
* for a row and should be combined with a result representing the remaining cells in that row to
* form a complete (non-partial) result.
* @return Whether or not the result is a partial result
- * @deprecated the word 'partial' ambiguous, use {@link #mayHaveMoreCellsInRow()} instead.
+ * @deprecated the word 'partial' ambiguous, use {@link #hasMoreCellsInRow()} instead.
* Deprecated since 1.4.0.
- * @see #mayHaveMoreCellsInRow()
+ * @see #hasMoreCellsInRow()
*/
@Deprecated
public boolean isPartial() {
- return mayHaveMoreCellsInRow;
+ return hasMoreCellsInRow;
}
/**
* For scanning large rows, the RS may choose to return the cells chunk by chunk to prevent OOM.
* This flag is used to tell you if the current Result is the last one of the current row. False
- * means this Result is the last one. True means there may still be more cells for the current
- * row. Notice that, 'may' have, not must have. This is because we may reach the size or time
- * limit just at the last cell of row at RS, so we do not know if it is the last one.
+ * means this Result is the last one. True means there are be more cells for the current row.
*
* The Scan configuration used to control the result size on the server is
* {@link Scan#setMaxResultSize(long)} and the default value can be seen here:
* {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE}
*/
- public boolean mayHaveMoreCellsInRow() {
- return mayHaveMoreCellsInRow;
+ public boolean hasMoreCellsInRow() {
+ return hasMoreCellsInRow;
}
/**
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index d935a08a533..52ee8a56b30 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1329,7 +1329,7 @@ public final class ProtobufUtil {
}
builder.setStale(result.isStale());
- builder.setPartial(result.mayHaveMoreCellsInRow());
+ builder.setPartial(result.hasMoreCellsInRow());
return builder.build();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 4b0ba6edca2..271a0def2c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -1443,7 +1443,7 @@ public final class ProtobufUtil {
}
builder.setStale(result.isStale());
- builder.setPartial(result.mayHaveMoreCellsInRow());
+ builder.setPartial(result.hasMoreCellsInRow());
return builder.build();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 4f08be91edc..a4dc9741a95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5970,7 +5970,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
- if (!scannerContext.mayHaveMoreCellsInRow()) {
+ if (!scannerContext.hasMoreCellsInRow()) {
resetFilters();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 984b9653731..e6c2a49d92e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -350,16 +350,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final String scannerName;
private final RegionScanner s;
private final Region r;
- private final boolean allowPartial;
private final RpcCallback closeCallBack;
private final RpcCallback shippedCallback;
- public RegionScannerHolder(String scannerName, RegionScanner s, Region r, boolean allowPartial,
+ public RegionScannerHolder(String scannerName, RegionScanner s, Region r,
RpcCallback closeCallBack, RpcCallback shippedCallback) {
this.scannerName = scannerName;
this.s = s;
this.r = r;
- this.allowPartial = allowPartial;
this.closeCallBack = closeCallBack;
this.shippedCallback = shippedCallback;
}
@@ -488,7 +486,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (clientCellBlockSupported) {
for (Result res : results) {
builder.addCellsPerResult(res.size());
- builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
+ builder.addPartialFlagPerResult(res.hasMoreCellsInRow());
}
controller.setCellScanner(CellUtil.createCellScanner(results));
} else {
@@ -1212,8 +1210,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return lastBlock;
}
- private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r,
- boolean allowPartial) throws LeaseStillHeldException {
+ private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
+ throws LeaseStillHeldException {
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName));
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
@@ -1224,7 +1222,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
closeCallback = new RegionScannerCloseCallBack(s);
}
RegionScannerHolder rsh =
- new RegionScannerHolder(scannerName, s, r, allowPartial, closeCallback, shippedCallback);
+ new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
return rsh;
@@ -2722,8 +2720,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.setMvccReadPoint(scanner.getMvccReadPoint());
builder.setTtl(scannerLeaseTimeoutPeriod);
String scannerName = String.valueOf(scannerId);
- return addScanner(scannerName, scanner, region,
- !scan.isSmall() && !(request.hasLimitOfRows() && request.getLimitOfRows() > 0));
+ return addScanner(scannerName, scanner, region);
}
private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
@@ -2779,7 +2776,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// return whether we have more results in region.
private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
- long maxQuotaResultSize, int rows, List results, ScanResponse.Builder builder,
+ long maxQuotaResultSize, int maxResults, List results, ScanResponse.Builder builder,
MutableObject lastBlock, RpcCallContext context) throws IOException {
Region region = rsh.r;
RegionScanner scanner = rsh.s;
@@ -2810,8 +2807,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// correct ordering of partial results and so we prevent partial results from being
// formed.
boolean serverGuaranteesOrderOfPartials = results.isEmpty();
- boolean allowPartialResults =
- clientHandlesPartials && serverGuaranteesOrderOfPartials && rsh.allowPartial;
+ boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
boolean moreRows = false;
// Heartbeat messages occur when the processing of the ScanRequest is exceeds a
@@ -2843,7 +2839,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false;
- while (i < rows) {
+ while (i < maxResults) {
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
// batch limit is a limit on the number of cells per Result. Thus, if progress is
// being tracked (i.e. scannerContext.keepProgress() is true) then we need to
@@ -2855,7 +2851,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
moreRows = scanner.nextRaw(values, scannerContext);
if (!values.isEmpty()) {
- Result r = Result.create(values, null, stale, scannerContext.mayHaveMoreCellsInRow());
+ Result r = Result.create(values, null, stale, scannerContext.hasMoreCellsInRow());
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
results.add(r);
i++;
@@ -2863,7 +2859,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
- boolean rowLimitReached = i >= rows;
+ boolean rowLimitReached = i >= maxResults;
limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
if (limitReached || !moreRows) {
@@ -2920,7 +2916,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// coprocessor postNext hook
if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+ region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
}
return builder.getMoreResultsInRegion();
}
@@ -3073,8 +3069,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// with the old scan implementation where we just ignore the returned results if moreResults
// is false. Can remove the isEmpty check after we get rid of the old implementation.
moreResults = false;
- } else if (limitOfRows > 0 && results.size() >= limitOfRows
- && !results.get(results.size() - 1).mayHaveMoreCellsInRow()) {
+ } else if (limitOfRows > 0 && !results.isEmpty() &&
+ !results.get(results.size() - 1).hasMoreCellsInRow() &&
+ ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) {
// if we have reached the limit of rows
moreResults = false;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index ad327723332..15e2ec01c86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -225,10 +225,10 @@ public class ScannerContext {
}
/**
- * @return true when we may have more cells for the current row. This usually because we have
- * reached a limit in the middle of a row
+ * @return true when we have more cells for the current row. This usually because we have reached
+ * a limit in the middle of a row
*/
- boolean mayHaveMoreCellsInRow() {
+ boolean hasMoreCellsInRow() {
return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW ||
scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW ||
scannerState == NextState.BATCH_LIMIT_REACHED;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
index 2ebfa6a04b4..83f31010bc6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java
@@ -594,36 +594,6 @@ public class TestPartialResultsFromClientSide {
scanner.close();
}
- /**
- * Small scans should not return partial results because it would prevent small scans from
- * retrieving all of the necessary results in a single RPC request which is what makese small
- * scans useful. Thus, ensure that even when {@link Scan#getAllowPartialResults()} is true, small
- * scans do not return partial results
- * @throws Exception
- */
- @Test
- public void testSmallScansDoNotAllowPartials() throws Exception {
- Scan scan = new Scan();
- testSmallScansDoNotAllowPartials(scan);
- scan.setReversed(true);
- testSmallScansDoNotAllowPartials(scan);
- }
-
- public void testSmallScansDoNotAllowPartials(Scan baseScan) throws Exception {
- Scan scan = new Scan(baseScan);
- scan.setAllowPartialResults(true);
- scan.setSmall(true);
- scan.setMaxResultSize(1);
- ResultScanner scanner = TABLE.getScanner(scan);
- Result r = null;
-
- while ((r = scanner.next()) != null) {
- assertFalse(r.isPartial());
- }
-
- scanner.close();
- }
-
/**
* Make puts to put the input value into each combination of row, family, and qualifier
* @param rows
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
index d0c980618af..7bab8948c62 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java
@@ -24,12 +24,14 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -72,20 +74,41 @@ public abstract class AbstractTestAsyncTableScan {
TEST_UTIL.shutdownMiniCluster();
}
+ protected static Scan createNormalScan() {
+ return new Scan();
+ }
+
+ protected static Scan createBatchScan() {
+ return new Scan().setBatch(1);
+ }
+
+ // set a small result size for testing flow control
+ protected static Scan createSmallResultSizeScan() {
+ return new Scan().setMaxResultSize(1);
+ }
+
+ protected static Scan createBatchSmallResultSizeScan() {
+ return new Scan().setBatch(1).setMaxResultSize(1);
+ }
+
+ protected static List>> getScanCreater() {
+ return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
+ Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
+ Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
+ Pair.newPair("batchSmallResultSize",
+ AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
+ }
+
protected abstract Scan createScan();
protected abstract List doScan(Scan scan) throws Exception;
- private Result convertToPartial(Result result) {
- return Result.create(result.rawCells(), result.getExists(), result.isStale(), true);
- }
-
protected final List convertFromBatchResult(List results) {
assertTrue(results.size() % 2 == 0);
return IntStream.range(0, results.size() / 2).mapToObj(i -> {
try {
- return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)),
- convertToPartial(results.get(2 * i + 1))));
+ return Result
+ .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@@ -98,8 +121,8 @@ public abstract class AbstractTestAsyncTableScan {
// make sure all scanners are closed at RS side
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.forEach(rs -> assertEquals(
- "The scanner count of " + rs.getServerName() + " is "
- + rs.getRSRpcServices().getScannersCount(),
+ "The scanner count of " + rs.getServerName() + " is " +
+ rs.getRSRpcServices().getScannersCount(),
0, rs.getRSRpcServices().getScannersCount()));
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> {
@@ -140,61 +163,112 @@ public abstract class AbstractTestAsyncTableScan {
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
}
- private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive)
- throws Exception {
- List results = doScan(
- createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
- .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive));
+ private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
+ int limit) throws Exception {
+ Scan scan =
+ createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
+ .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
+ if (limit > 0) {
+ scan.setLimit(limit);
+ }
+ List results = doScan(scan);
int actualStart = startInclusive ? start : start + 1;
int actualStop = stopInclusive ? stop + 1 : stop;
- assertEquals(actualStop - actualStart, results.size());
- IntStream.range(0, actualStop - actualStart)
- .forEach(i -> assertResultEquals(results.get(i), actualStart + i));
+ int count = actualStop - actualStart;
+ if (limit > 0) {
+ count = Math.min(count, limit);
+ }
+ assertEquals(count, results.size());
+ IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
}
- private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive)
- throws Exception {
- List results = doScan(createScan()
+ private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
+ int limit) throws Exception {
+ Scan scan = createScan()
.withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
- .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true));
+ .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
+ if (limit > 0) {
+ scan.setLimit(limit);
+ }
+ List results = doScan(scan);
int actualStart = startInclusive ? start : start - 1;
int actualStop = stopInclusive ? stop - 1 : stop;
- assertEquals(actualStart - actualStop, results.size());
- IntStream.range(0, actualStart - actualStop)
- .forEach(i -> assertResultEquals(results.get(i), actualStart - i));
+ int count = actualStart - actualStop;
+ if (limit > 0) {
+ count = Math.min(count, limit);
+ }
+ assertEquals(count, results.size());
+ IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
}
@Test
public void testScanWithStartKeyAndStopKey() throws Exception {
- testScan(1, true, 998, false); // from first region to last region
- testScan(123, true, 345, true);
- testScan(234, true, 456, false);
- testScan(345, false, 567, true);
- testScan(456, false, 678, false);
+ testScan(1, true, 998, false, -1); // from first region to last region
+ testScan(123, true, 345, true, -1);
+ testScan(234, true, 456, false, -1);
+ testScan(345, false, 567, true, -1);
+ testScan(456, false, 678, false, -1);
}
@Test
public void testReversedScanWithStartKeyAndStopKey() throws Exception {
- testReversedScan(998, true, 1, false); // from last region to first region
- testReversedScan(543, true, 321, true);
- testReversedScan(654, true, 432, false);
- testReversedScan(765, false, 543, true);
- testReversedScan(876, false, 654, false);
+ testReversedScan(998, true, 1, false, -1); // from last region to first region
+ testReversedScan(543, true, 321, true, -1);
+ testReversedScan(654, true, 432, false, -1);
+ testReversedScan(765, false, 543, true, -1);
+ testReversedScan(876, false, 654, false, -1);
}
@Test
public void testScanAtRegionBoundary() throws Exception {
- testScan(222, true, 333, true);
- testScan(333, true, 444, false);
- testScan(444, false, 555, true);
- testScan(555, false, 666, false);
+ testScan(222, true, 333, true, -1);
+ testScan(333, true, 444, false, -1);
+ testScan(444, false, 555, true, -1);
+ testScan(555, false, 666, false, -1);
}
@Test
public void testReversedScanAtRegionBoundary() throws Exception {
- testReversedScan(333, true, 222, true);
- testReversedScan(444, true, 333, false);
- testReversedScan(555, false, 444, true);
- testReversedScan(666, false, 555, false);
+ testReversedScan(333, true, 222, true, -1);
+ testReversedScan(444, true, 333, false, -1);
+ testReversedScan(555, false, 444, true, -1);
+ testReversedScan(666, false, 555, false, -1);
+ }
+
+ @Test
+ public void testScanWithLimit() throws Exception {
+ testScan(1, true, 998, false, 900); // from first region to last region
+ testScan(123, true, 345, true, 100);
+ testScan(234, true, 456, false, 100);
+ testScan(345, false, 567, true, 100);
+ testScan(456, false, 678, false, 100);
+
+ }
+
+ @Test
+ public void testScanWithLimitGreaterThanActualCount() throws Exception {
+ testScan(1, true, 998, false, 1000); // from first region to last region
+ testScan(123, true, 345, true, 200);
+ testScan(234, true, 456, false, 200);
+ testScan(345, false, 567, true, 200);
+ testScan(456, false, 678, false, 200);
+ }
+
+ @Test
+ public void testReversedScanWithLimit() throws Exception {
+ testReversedScan(998, true, 1, false, 900); // from last region to first region
+ testReversedScan(543, true, 321, true, 100);
+ testReversedScan(654, true, 432, false, 100);
+ testReversedScan(765, false, 543, true, 100);
+ testReversedScan(876, false, 654, false, 100);
+ }
+
+ @Test
+ public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
+ testReversedScan(998, true, 1, false, 1000); // from last region to first region
+ testReversedScan(543, true, 321, true, 200);
+ testReversedScan(654, true, 432, false, 200);
+ testReversedScan(765, false, 543, true, 200);
+ testReversedScan(876, false, 654, false, 200);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
index f151e832f4a..a8aad0b09d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.client;
import com.google.common.base.Throwables;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -76,32 +76,16 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
}
}
- @Parameter
+ @Parameter(0)
+ public String scanType;
+
+ @Parameter(1)
public Supplier scanCreater;
- @Parameters
+ @Parameters(name = "{index}: scan={0}")
public static List