HBASE-17595 Add partial result support for small/limited scan
This commit is contained in:
parent
ff045cab84
commit
8fb44fae35
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
|
|||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
|
||||
|
||||
|
@ -31,6 +32,7 @@ import io.netty.util.Timeout;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -202,7 +204,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
|
||||
private ScanResponse resp;
|
||||
|
||||
private int numValidResults;
|
||||
private int numberOfIndividualRows;
|
||||
|
||||
// If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
|
||||
// by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
|
||||
|
@ -219,7 +221,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
// resume is called after suspend, then it is also safe to just reference resp and
|
||||
// numValidResults after the synchronized block as no one will change it anymore.
|
||||
ScanResponse localResp;
|
||||
int localNumValidResults;
|
||||
int localNumberOfIndividualRows;
|
||||
synchronized (this) {
|
||||
if (state == ScanResumerState.INITIALIZED) {
|
||||
// user calls this method before we call prepare, so just set the state to
|
||||
|
@ -236,9 +238,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
leaseRenewer.cancel();
|
||||
}
|
||||
localResp = this.resp;
|
||||
localNumValidResults = this.numValidResults;
|
||||
localNumberOfIndividualRows = this.numberOfIndividualRows;
|
||||
}
|
||||
completeOrNext(localResp, localNumValidResults);
|
||||
completeOrNext(localResp, localNumberOfIndividualRows);
|
||||
}
|
||||
|
||||
private void scheduleRenewLeaseTask() {
|
||||
|
@ -258,14 +260,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
|
||||
// return false if the scan has already been resumed. See the comment above for ScanResumerImpl
|
||||
// for more details.
|
||||
synchronized boolean prepare(ScanResponse resp, int numValidResults) {
|
||||
synchronized boolean prepare(ScanResponse resp, int numberOfIndividualRows) {
|
||||
if (state == ScanResumerState.RESUMED) {
|
||||
// user calls resume before we actually suspend the scan, just continue;
|
||||
return false;
|
||||
}
|
||||
state = ScanResumerState.SUSPENDED;
|
||||
this.resp = resp;
|
||||
this.numValidResults = numValidResults;
|
||||
this.numberOfIndividualRows = numberOfIndividualRows;
|
||||
// if there are no more results in region then the scanner at RS side will be closed
|
||||
// automatically so we do not need to renew lease.
|
||||
if (resp.getMoreResultsInRegion()) {
|
||||
|
@ -402,7 +404,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
|
||||
private void updateNextStartRowWhenError(Result result) {
|
||||
nextStartRowWhenError = result.getRow();
|
||||
includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
|
||||
includeNextStartRowWhenError = result.hasMoreCellsInRow();
|
||||
}
|
||||
|
||||
private void completeWhenNoMoreResultsInRegion() {
|
||||
|
@ -421,7 +423,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
}
|
||||
}
|
||||
|
||||
private void completeOrNext(ScanResponse resp, int numValidResults) {
|
||||
private void completeOrNext(ScanResponse resp, int numIndividualRows) {
|
||||
if (resp.hasMoreResults() && !resp.getMoreResults()) {
|
||||
// RS tells us there is no more data for the whole scan
|
||||
completeNoMoreResults();
|
||||
|
@ -429,10 +431,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
}
|
||||
if (scan.getLimit() > 0) {
|
||||
// The RS should have set the moreResults field in ScanResponse to false when we have reached
|
||||
// the limit.
|
||||
int limit = scan.getLimit() - numValidResults;
|
||||
assert limit > 0;
|
||||
scan.setLimit(limit);
|
||||
// the limit, so we add an assert here.
|
||||
int newLimit = scan.getLimit() - numIndividualRows;
|
||||
assert newLimit > 0;
|
||||
scan.setLimit(newLimit);
|
||||
}
|
||||
// as in 2.0 this value will always be set
|
||||
if (!resp.getMoreResultsInRegion()) {
|
||||
|
@ -462,10 +464,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
completeWhenError(true);
|
||||
return;
|
||||
}
|
||||
|
||||
// calculate this before calling onNext as it is free for user to modify the result array in
|
||||
// onNext.
|
||||
int numberOfIndividualRows = numberOfIndividualRows(Arrays.asList(results));
|
||||
ScanControllerImpl scanController = new ScanControllerImpl();
|
||||
if (results.length == 0) {
|
||||
// if we have nothing to return then this must be a heartbeat message.
|
||||
// if we have nothing to return then just call onHeartbeat.
|
||||
consumer.onHeartbeat(scanController);
|
||||
} else {
|
||||
updateNextStartRowWhenError(results[results.length - 1]);
|
||||
|
@ -482,11 +486,11 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
return;
|
||||
}
|
||||
if (state == ScanControllerState.SUSPENDED) {
|
||||
if (scanController.resumer.prepare(resp, results.length)) {
|
||||
if (scanController.resumer.prepare(resp, numberOfIndividualRows)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
completeOrNext(resp, results.length);
|
||||
completeOrNext(resp, numberOfIndividualRows);
|
||||
}
|
||||
|
||||
private void call() {
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
@ -51,8 +54,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Implements the scanner interface for the HBase client. If there are multiple regions in a table,
|
||||
* this scanner will iterate through them all.
|
||||
|
@ -405,7 +406,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
// If the lastRow is not partial, then we should start from the next row. As now we can
|
||||
// exclude the start row, the logic here is the same for both normal scan and reversed scan.
|
||||
// If lastResult is partial then include it, otherwise exclude it.
|
||||
scan.withStartRow(lastResult.getRow(), lastResult.isPartial() || scan.getBatch() > 0);
|
||||
scan.withStartRow(lastResult.getRow(), lastResult.hasMoreCellsInRow());
|
||||
}
|
||||
if (e instanceof OutOfOrderScannerNextException) {
|
||||
if (retryAfterOutOfOrderException.isTrue()) {
|
||||
|
@ -496,16 +497,16 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
remainingResultSize -= estimatedHeapSizeOfResult;
|
||||
addEstimatedSize(estimatedHeapSizeOfResult);
|
||||
this.lastResult = rs;
|
||||
if (this.lastResult.isPartial() || scan.getBatch() > 0) {
|
||||
if (this.lastResult.hasMoreCellsInRow()) {
|
||||
updateLastCellLoadedToCache(this.lastResult);
|
||||
} else {
|
||||
this.lastCellLoadedToCache = null;
|
||||
}
|
||||
}
|
||||
if (scan.getLimit() > 0) {
|
||||
int limit = scan.getLimit() - resultsToAddToCache.size();
|
||||
assert limit >= 0;
|
||||
scan.setLimit(limit);
|
||||
if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
|
||||
int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
|
||||
assert newLimit >= 0;
|
||||
scan.setLimit(newLimit);
|
||||
}
|
||||
}
|
||||
if (scanExhausted(values)) {
|
||||
|
@ -620,7 +621,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
// In every RPC response there should be at most a single partial result. Furthermore, if
|
||||
// there is a partial result, it is guaranteed to be in the last position of the array.
|
||||
Result last = resultsFromServer[resultsFromServer.length - 1];
|
||||
Result partial = last.isPartial() ? last : null;
|
||||
Result partial = last.hasMoreCellsInRow() ? last : null;
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -666,7 +667,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
|
||||
// If the result is not a partial, it is a signal to us that it is the last Result we
|
||||
// need to form the complete Result client-side
|
||||
if (!result.isPartial()) {
|
||||
if (!result.hasMoreCellsInRow()) {
|
||||
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
|
||||
clearPartialResults();
|
||||
}
|
||||
|
@ -682,7 +683,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
// It's possible that in one response from the server we receive the final partial for
|
||||
// one row and receive a partial for a different row. Thus, make sure that all Results
|
||||
// are added to the proper list
|
||||
if (result.isPartial()) {
|
||||
if (result.hasMoreCellsInRow()) {
|
||||
addToPartialResults(result);
|
||||
} else {
|
||||
resultsToAddToCache.add(result);
|
||||
|
@ -824,7 +825,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
index++;
|
||||
}
|
||||
Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
|
||||
return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
|
||||
return Result.create(list, result.getExists(), result.isStale(), result.hasMoreCellsInRow());
|
||||
}
|
||||
|
||||
protected void initCache() {
|
||||
|
|
|
@ -76,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache {
|
|||
// In every RPC response there should be at most a single partial result. Furthermore, if
|
||||
// there is a partial result, it is guaranteed to be in the last position of the array.
|
||||
Result last = results[results.length - 1];
|
||||
if (last.isPartial()) {
|
||||
if (last.hasMoreCellsInRow()) {
|
||||
if (partialResults.isEmpty()) {
|
||||
partialResults.add(last);
|
||||
return Arrays.copyOf(results, results.length - 1);
|
||||
|
|
|
@ -322,7 +322,7 @@ public final class ConnectionUtils {
|
|||
return null;
|
||||
}
|
||||
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
|
||||
result.isStale(), result.mayHaveMoreCellsInRow());
|
||||
result.isStale(), result.hasMoreCellsInRow());
|
||||
}
|
||||
|
||||
// Add a delta to avoid timeout immediately after a retry sleeping.
|
||||
|
@ -381,4 +381,21 @@ public final class ConnectionUtils {
|
|||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
||||
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Count the individual rows for the given result list.
|
||||
* <p>
|
||||
* There are two reason why we need to use this method instead of a simple {@code results.length}.
|
||||
* <ol>
|
||||
* <li>Server may return only part of the whole cells of a row for the last result, and if
|
||||
* allowPartial is true, we will return the array to user directly. We should not count the last
|
||||
* result.</li>
|
||||
* <li>If this is a batched scan, a row may be split into several results, but they should be
|
||||
* counted as one row. For example, a row with 15 cells will be split into 3 results with 5 cells
|
||||
* each if {@code scan.getBatch()} is 5.</li>
|
||||
* </ol>
|
||||
*/
|
||||
public static int numberOfIndividualRows(List<Result> results) {
|
||||
return (int) results.stream().filter(r -> !r.hasMoreCellsInRow()).count();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -330,10 +330,6 @@ public class HTable implements Table {
|
|||
*/
|
||||
@Override
|
||||
public ResultScanner getScanner(Scan scan) throws IOException {
|
||||
if (scan.getBatch() > 0 && scan.isSmall()) {
|
||||
throw new IllegalArgumentException("Small scan should not be used with batching");
|
||||
}
|
||||
|
||||
if (scan.getCaching() <= 0) {
|
||||
scan.setCaching(scannerCaching);
|
||||
}
|
||||
|
|
|
@ -376,15 +376,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
}
|
||||
|
||||
public void scan(Scan scan, RawScanResultConsumer consumer) {
|
||||
if (scan.isSmall() || scan.getLimit() > 0) {
|
||||
if (scan.getBatch() > 0 || scan.getAllowPartialResults()) {
|
||||
consumer.onError(new IllegalArgumentException(
|
||||
"Batch and allowPartial is not allowed for small scan or limited scan"));
|
||||
}
|
||||
}
|
||||
scan = setDefaultScanConfig(scan);
|
||||
new AsyncClientScanner(scan, consumer, tableName, conn, pauseNs, maxAttempts, scanTimeoutNs,
|
||||
readRpcTimeoutNs, startLogErrorsCnt).start();
|
||||
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
|
||||
maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -84,10 +84,9 @@ public class Result implements CellScannable, CellScanner {
|
|||
private boolean stale = false;
|
||||
|
||||
/**
|
||||
* See {@link #mayHaveMoreCellsInRow()}. And please notice that, The client side implementation
|
||||
* should also check for row key change to determine if a Result is the last one for a row.
|
||||
* See {@link #hasMoreCellsInRow()}.
|
||||
*/
|
||||
private boolean mayHaveMoreCellsInRow = false;
|
||||
private boolean hasMoreCellsInRow = false;
|
||||
// We're not using java serialization. Transient here is just a marker to say
|
||||
// that this is where we cache row if we're ever asked for it.
|
||||
private transient byte [] row = null;
|
||||
|
@ -178,7 +177,7 @@ public class Result implements CellScannable, CellScanner {
|
|||
this.cells = cells;
|
||||
this.exists = exists;
|
||||
this.stale = stale;
|
||||
this.mayHaveMoreCellsInRow = mayHaveMoreCellsInRow;
|
||||
this.hasMoreCellsInRow = mayHaveMoreCellsInRow;
|
||||
this.readonly = false;
|
||||
}
|
||||
|
||||
|
@ -823,7 +822,7 @@ public class Result implements CellScannable, CellScanner {
|
|||
// Result1: -1- -2- (2 cells, size limit reached, mark as partial)
|
||||
// Result2: -3- -4- (2 cells, size limit reached, mark as partial)
|
||||
// Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
|
||||
if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
|
||||
if (i != (partialResults.size() - 1) && !r.hasMoreCellsInRow()) {
|
||||
throw new IOException(
|
||||
"Cannot form complete result. Result is missing partial flag. " +
|
||||
"Partial Results: " + partialResults);
|
||||
|
@ -910,28 +909,26 @@ public class Result implements CellScannable, CellScanner {
|
|||
* for a row and should be combined with a result representing the remaining cells in that row to
|
||||
* form a complete (non-partial) result.
|
||||
* @return Whether or not the result is a partial result
|
||||
* @deprecated the word 'partial' ambiguous, use {@link #mayHaveMoreCellsInRow()} instead.
|
||||
* @deprecated the word 'partial' ambiguous, use {@link #hasMoreCellsInRow()} instead.
|
||||
* Deprecated since 1.4.0.
|
||||
* @see #mayHaveMoreCellsInRow()
|
||||
* @see #hasMoreCellsInRow()
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isPartial() {
|
||||
return mayHaveMoreCellsInRow;
|
||||
return hasMoreCellsInRow;
|
||||
}
|
||||
|
||||
/**
|
||||
* For scanning large rows, the RS may choose to return the cells chunk by chunk to prevent OOM.
|
||||
* This flag is used to tell you if the current Result is the last one of the current row. False
|
||||
* means this Result is the last one. True means there may still be more cells for the current
|
||||
* row. Notice that, 'may' have, not must have. This is because we may reach the size or time
|
||||
* limit just at the last cell of row at RS, so we do not know if it is the last one.
|
||||
* means this Result is the last one. True means there are be more cells for the current row.
|
||||
* <p>
|
||||
* The Scan configuration used to control the result size on the server is
|
||||
* {@link Scan#setMaxResultSize(long)} and the default value can be seen here:
|
||||
* {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE}
|
||||
*/
|
||||
public boolean mayHaveMoreCellsInRow() {
|
||||
return mayHaveMoreCellsInRow;
|
||||
public boolean hasMoreCellsInRow() {
|
||||
return hasMoreCellsInRow;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1329,7 +1329,7 @@ public final class ProtobufUtil {
|
|||
}
|
||||
|
||||
builder.setStale(result.isStale());
|
||||
builder.setPartial(result.mayHaveMoreCellsInRow());
|
||||
builder.setPartial(result.hasMoreCellsInRow());
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -1443,7 +1443,7 @@ public final class ProtobufUtil {
|
|||
}
|
||||
|
||||
builder.setStale(result.isStale());
|
||||
builder.setPartial(result.mayHaveMoreCellsInRow());
|
||||
builder.setPartial(result.hasMoreCellsInRow());
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -5970,7 +5970,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// If the size limit was reached it means a partial Result is being returned. Returning a
|
||||
// partial Result means that we should not reset the filters; filters should only be reset in
|
||||
// between rows
|
||||
if (!scannerContext.mayHaveMoreCellsInRow()) {
|
||||
if (!scannerContext.hasMoreCellsInRow()) {
|
||||
resetFilters();
|
||||
}
|
||||
|
||||
|
|
|
@ -350,16 +350,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
private final String scannerName;
|
||||
private final RegionScanner s;
|
||||
private final Region r;
|
||||
private final boolean allowPartial;
|
||||
private final RpcCallback closeCallBack;
|
||||
private final RpcCallback shippedCallback;
|
||||
|
||||
public RegionScannerHolder(String scannerName, RegionScanner s, Region r, boolean allowPartial,
|
||||
public RegionScannerHolder(String scannerName, RegionScanner s, Region r,
|
||||
RpcCallback closeCallBack, RpcCallback shippedCallback) {
|
||||
this.scannerName = scannerName;
|
||||
this.s = s;
|
||||
this.r = r;
|
||||
this.allowPartial = allowPartial;
|
||||
this.closeCallBack = closeCallBack;
|
||||
this.shippedCallback = shippedCallback;
|
||||
}
|
||||
|
@ -488,7 +486,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (clientCellBlockSupported) {
|
||||
for (Result res : results) {
|
||||
builder.addCellsPerResult(res.size());
|
||||
builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
|
||||
builder.addPartialFlagPerResult(res.hasMoreCellsInRow());
|
||||
}
|
||||
controller.setCellScanner(CellUtil.createCellScanner(results));
|
||||
} else {
|
||||
|
@ -1212,8 +1210,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return lastBlock;
|
||||
}
|
||||
|
||||
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r,
|
||||
boolean allowPartial) throws LeaseStillHeldException {
|
||||
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
|
||||
throws LeaseStillHeldException {
|
||||
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
|
||||
new ScannerListener(scannerName));
|
||||
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
|
||||
|
@ -1224,7 +1222,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
closeCallback = new RegionScannerCloseCallBack(s);
|
||||
}
|
||||
RegionScannerHolder rsh =
|
||||
new RegionScannerHolder(scannerName, s, r, allowPartial, closeCallback, shippedCallback);
|
||||
new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback);
|
||||
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
|
||||
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
|
||||
return rsh;
|
||||
|
@ -2722,8 +2720,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
builder.setMvccReadPoint(scanner.getMvccReadPoint());
|
||||
builder.setTtl(scannerLeaseTimeoutPeriod);
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
return addScanner(scannerName, scanner, region,
|
||||
!scan.isSmall() && !(request.hasLimitOfRows() && request.getLimitOfRows() > 0));
|
||||
return addScanner(scannerName, scanner, region);
|
||||
}
|
||||
|
||||
private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
|
||||
|
@ -2779,7 +2776,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
// return whether we have more results in region.
|
||||
private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
|
||||
long maxQuotaResultSize, int rows, List<Result> results, ScanResponse.Builder builder,
|
||||
long maxQuotaResultSize, int maxResults, List<Result> results, ScanResponse.Builder builder,
|
||||
MutableObject lastBlock, RpcCallContext context) throws IOException {
|
||||
Region region = rsh.r;
|
||||
RegionScanner scanner = rsh.s;
|
||||
|
@ -2810,8 +2807,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// correct ordering of partial results and so we prevent partial results from being
|
||||
// formed.
|
||||
boolean serverGuaranteesOrderOfPartials = results.isEmpty();
|
||||
boolean allowPartialResults =
|
||||
clientHandlesPartials && serverGuaranteesOrderOfPartials && rsh.allowPartial;
|
||||
boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
|
||||
boolean moreRows = false;
|
||||
|
||||
// Heartbeat messages occur when the processing of the ScanRequest is exceeds a
|
||||
|
@ -2843,7 +2839,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
contextBuilder.setTrackMetrics(trackMetrics);
|
||||
ScannerContext scannerContext = contextBuilder.build();
|
||||
boolean limitReached = false;
|
||||
while (i < rows) {
|
||||
while (i < maxResults) {
|
||||
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
|
||||
// batch limit is a limit on the number of cells per Result. Thus, if progress is
|
||||
// being tracked (i.e. scannerContext.keepProgress() is true) then we need to
|
||||
|
@ -2855,7 +2851,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
moreRows = scanner.nextRaw(values, scannerContext);
|
||||
|
||||
if (!values.isEmpty()) {
|
||||
Result r = Result.create(values, null, stale, scannerContext.mayHaveMoreCellsInRow());
|
||||
Result r = Result.create(values, null, stale, scannerContext.hasMoreCellsInRow());
|
||||
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
|
||||
results.add(r);
|
||||
i++;
|
||||
|
@ -2863,7 +2859,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
|
||||
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
|
||||
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
|
||||
boolean rowLimitReached = i >= rows;
|
||||
boolean rowLimitReached = i >= maxResults;
|
||||
limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
|
||||
|
||||
if (limitReached || !moreRows) {
|
||||
|
@ -2920,7 +2916,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
// coprocessor postNext hook
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
|
||||
region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
|
||||
}
|
||||
return builder.getMoreResultsInRegion();
|
||||
}
|
||||
|
@ -3073,8 +3069,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// with the old scan implementation where we just ignore the returned results if moreResults
|
||||
// is false. Can remove the isEmpty check after we get rid of the old implementation.
|
||||
moreResults = false;
|
||||
} else if (limitOfRows > 0 && results.size() >= limitOfRows
|
||||
&& !results.get(results.size() - 1).mayHaveMoreCellsInRow()) {
|
||||
} else if (limitOfRows > 0 && !results.isEmpty() &&
|
||||
!results.get(results.size() - 1).hasMoreCellsInRow() &&
|
||||
ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) {
|
||||
// if we have reached the limit of rows
|
||||
moreResults = false;
|
||||
}
|
||||
|
|
|
@ -225,10 +225,10 @@ public class ScannerContext {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return true when we may have more cells for the current row. This usually because we have
|
||||
* reached a limit in the middle of a row
|
||||
* @return true when we have more cells for the current row. This usually because we have reached
|
||||
* a limit in the middle of a row
|
||||
*/
|
||||
boolean mayHaveMoreCellsInRow() {
|
||||
boolean hasMoreCellsInRow() {
|
||||
return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW ||
|
||||
scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW ||
|
||||
scannerState == NextState.BATCH_LIMIT_REACHED;
|
||||
|
|
|
@ -594,36 +594,6 @@ public class TestPartialResultsFromClientSide {
|
|||
scanner.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Small scans should not return partial results because it would prevent small scans from
|
||||
* retrieving all of the necessary results in a single RPC request which is what makese small
|
||||
* scans useful. Thus, ensure that even when {@link Scan#getAllowPartialResults()} is true, small
|
||||
* scans do not return partial results
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testSmallScansDoNotAllowPartials() throws Exception {
|
||||
Scan scan = new Scan();
|
||||
testSmallScansDoNotAllowPartials(scan);
|
||||
scan.setReversed(true);
|
||||
testSmallScansDoNotAllowPartials(scan);
|
||||
}
|
||||
|
||||
public void testSmallScansDoNotAllowPartials(Scan baseScan) throws Exception {
|
||||
Scan scan = new Scan(baseScan);
|
||||
scan.setAllowPartialResults(true);
|
||||
scan.setSmall(true);
|
||||
scan.setMaxResultSize(1);
|
||||
ResultScanner scanner = TABLE.getScanner(scan);
|
||||
Result r = null;
|
||||
|
||||
while ((r = scanner.next()) != null) {
|
||||
assertFalse(r.isPartial());
|
||||
}
|
||||
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Make puts to put the input value into each combination of row, family, and qualifier
|
||||
* @param rows
|
||||
|
|
|
@ -24,12 +24,14 @@ import java.io.IOException;
|
|||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -72,20 +74,41 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
protected static Scan createNormalScan() {
|
||||
return new Scan();
|
||||
}
|
||||
|
||||
protected static Scan createBatchScan() {
|
||||
return new Scan().setBatch(1);
|
||||
}
|
||||
|
||||
// set a small result size for testing flow control
|
||||
protected static Scan createSmallResultSizeScan() {
|
||||
return new Scan().setMaxResultSize(1);
|
||||
}
|
||||
|
||||
protected static Scan createBatchSmallResultSizeScan() {
|
||||
return new Scan().setBatch(1).setMaxResultSize(1);
|
||||
}
|
||||
|
||||
protected static List<Pair<String, Supplier<Scan>>> getScanCreater() {
|
||||
return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
|
||||
Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
|
||||
Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
|
||||
Pair.newPair("batchSmallResultSize",
|
||||
AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
|
||||
}
|
||||
|
||||
protected abstract Scan createScan();
|
||||
|
||||
protected abstract List<Result> doScan(Scan scan) throws Exception;
|
||||
|
||||
private Result convertToPartial(Result result) {
|
||||
return Result.create(result.rawCells(), result.getExists(), result.isStale(), true);
|
||||
}
|
||||
|
||||
protected final List<Result> convertFromBatchResult(List<Result> results) {
|
||||
assertTrue(results.size() % 2 == 0);
|
||||
return IntStream.range(0, results.size() / 2).mapToObj(i -> {
|
||||
try {
|
||||
return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)),
|
||||
convertToPartial(results.get(2 * i + 1))));
|
||||
return Result
|
||||
.createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
|
@ -98,8 +121,8 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
// make sure all scanners are closed at RS side
|
||||
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
|
||||
.forEach(rs -> assertEquals(
|
||||
"The scanner count of " + rs.getServerName() + " is "
|
||||
+ rs.getRSRpcServices().getScannersCount(),
|
||||
"The scanner count of " + rs.getServerName() + " is " +
|
||||
rs.getRSRpcServices().getScannersCount(),
|
||||
0, rs.getRSRpcServices().getScannersCount()));
|
||||
assertEquals(COUNT, results.size());
|
||||
IntStream.range(0, COUNT).forEach(i -> {
|
||||
|
@ -140,61 +163,112 @@ public abstract class AbstractTestAsyncTableScan {
|
|||
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
|
||||
}
|
||||
|
||||
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive)
|
||||
throws Exception {
|
||||
List<Result> results = doScan(
|
||||
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive));
|
||||
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
|
||||
int limit) throws Exception {
|
||||
Scan scan =
|
||||
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
|
||||
if (limit > 0) {
|
||||
scan.setLimit(limit);
|
||||
}
|
||||
List<Result> results = doScan(scan);
|
||||
int actualStart = startInclusive ? start : start + 1;
|
||||
int actualStop = stopInclusive ? stop + 1 : stop;
|
||||
assertEquals(actualStop - actualStart, results.size());
|
||||
IntStream.range(0, actualStop - actualStart)
|
||||
.forEach(i -> assertResultEquals(results.get(i), actualStart + i));
|
||||
int count = actualStop - actualStart;
|
||||
if (limit > 0) {
|
||||
count = Math.min(count, limit);
|
||||
}
|
||||
assertEquals(count, results.size());
|
||||
IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
|
||||
}
|
||||
|
||||
private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive)
|
||||
throws Exception {
|
||||
List<Result> results = doScan(createScan()
|
||||
private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
|
||||
int limit) throws Exception {
|
||||
Scan scan = createScan()
|
||||
.withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true));
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
|
||||
if (limit > 0) {
|
||||
scan.setLimit(limit);
|
||||
}
|
||||
List<Result> results = doScan(scan);
|
||||
int actualStart = startInclusive ? start : start - 1;
|
||||
int actualStop = stopInclusive ? stop - 1 : stop;
|
||||
assertEquals(actualStart - actualStop, results.size());
|
||||
IntStream.range(0, actualStart - actualStop)
|
||||
.forEach(i -> assertResultEquals(results.get(i), actualStart - i));
|
||||
int count = actualStart - actualStop;
|
||||
if (limit > 0) {
|
||||
count = Math.min(count, limit);
|
||||
}
|
||||
assertEquals(count, results.size());
|
||||
IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithStartKeyAndStopKey() throws Exception {
|
||||
testScan(1, true, 998, false); // from first region to last region
|
||||
testScan(123, true, 345, true);
|
||||
testScan(234, true, 456, false);
|
||||
testScan(345, false, 567, true);
|
||||
testScan(456, false, 678, false);
|
||||
testScan(1, true, 998, false, -1); // from first region to last region
|
||||
testScan(123, true, 345, true, -1);
|
||||
testScan(234, true, 456, false, -1);
|
||||
testScan(345, false, 567, true, -1);
|
||||
testScan(456, false, 678, false, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversedScanWithStartKeyAndStopKey() throws Exception {
|
||||
testReversedScan(998, true, 1, false); // from last region to first region
|
||||
testReversedScan(543, true, 321, true);
|
||||
testReversedScan(654, true, 432, false);
|
||||
testReversedScan(765, false, 543, true);
|
||||
testReversedScan(876, false, 654, false);
|
||||
testReversedScan(998, true, 1, false, -1); // from last region to first region
|
||||
testReversedScan(543, true, 321, true, -1);
|
||||
testReversedScan(654, true, 432, false, -1);
|
||||
testReversedScan(765, false, 543, true, -1);
|
||||
testReversedScan(876, false, 654, false, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanAtRegionBoundary() throws Exception {
|
||||
testScan(222, true, 333, true);
|
||||
testScan(333, true, 444, false);
|
||||
testScan(444, false, 555, true);
|
||||
testScan(555, false, 666, false);
|
||||
testScan(222, true, 333, true, -1);
|
||||
testScan(333, true, 444, false, -1);
|
||||
testScan(444, false, 555, true, -1);
|
||||
testScan(555, false, 666, false, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversedScanAtRegionBoundary() throws Exception {
|
||||
testReversedScan(333, true, 222, true);
|
||||
testReversedScan(444, true, 333, false);
|
||||
testReversedScan(555, false, 444, true);
|
||||
testReversedScan(666, false, 555, false);
|
||||
testReversedScan(333, true, 222, true, -1);
|
||||
testReversedScan(444, true, 333, false, -1);
|
||||
testReversedScan(555, false, 444, true, -1);
|
||||
testReversedScan(666, false, 555, false, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithLimit() throws Exception {
|
||||
testScan(1, true, 998, false, 900); // from first region to last region
|
||||
testScan(123, true, 345, true, 100);
|
||||
testScan(234, true, 456, false, 100);
|
||||
testScan(345, false, 567, true, 100);
|
||||
testScan(456, false, 678, false, 100);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithLimitGreaterThanActualCount() throws Exception {
|
||||
testScan(1, true, 998, false, 1000); // from first region to last region
|
||||
testScan(123, true, 345, true, 200);
|
||||
testScan(234, true, 456, false, 200);
|
||||
testScan(345, false, 567, true, 200);
|
||||
testScan(456, false, 678, false, 200);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversedScanWithLimit() throws Exception {
|
||||
testReversedScan(998, true, 1, false, 900); // from last region to first region
|
||||
testReversedScan(543, true, 321, true, 100);
|
||||
testReversedScan(654, true, 432, false, 100);
|
||||
testReversedScan(765, false, 543, true, 100);
|
||||
testReversedScan(876, false, 654, false, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
|
||||
testReversedScan(998, true, 1, false, 1000); // from last region to first region
|
||||
testReversedScan(543, true, 321, true, 200);
|
||||
testReversedScan(654, true, 432, false, 200);
|
||||
testReversedScan(765, false, 543, true, 200);
|
||||
testReversedScan(876, false, 654, false, 200);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.client;
|
|||
import com.google.common.base.Throwables;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -76,32 +76,16 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
}
|
||||
}
|
||||
|
||||
@Parameter
|
||||
@Parameter(0)
|
||||
public String scanType;
|
||||
|
||||
@Parameter(1)
|
||||
public Supplier<Scan> scanCreater;
|
||||
|
||||
@Parameters
|
||||
@Parameters(name = "{index}: scan={0}")
|
||||
public static List<Object[]> params() {
|
||||
return Arrays.asList(new Supplier<?>[] { TestAsyncTableScan::createNormalScan },
|
||||
new Supplier<?>[] { TestAsyncTableScan::createBatchScan },
|
||||
new Supplier<?>[] { TestAsyncTableScan::createSmallResultSizeScan },
|
||||
new Supplier<?>[] { TestAsyncTableScan::createBatchSmallResultSizeScan });
|
||||
}
|
||||
|
||||
private static Scan createNormalScan() {
|
||||
return new Scan();
|
||||
}
|
||||
|
||||
private static Scan createBatchScan() {
|
||||
return new Scan().setBatch(1);
|
||||
}
|
||||
|
||||
// set a small result size for testing flow control
|
||||
private static Scan createSmallResultSizeScan() {
|
||||
return new Scan().setMaxResultSize(1);
|
||||
}
|
||||
|
||||
private static Scan createBatchSmallResultSizeScan() {
|
||||
return new Scan().setBatch(1).setMaxResultSize(1);
|
||||
return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,20 +17,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -61,63 +55,14 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
|
|||
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
}
|
||||
|
||||
private static Scan createNormalScan() {
|
||||
return new Scan();
|
||||
}
|
||||
|
||||
// test if we can handle partial result when open scanner.
|
||||
private static Scan createSmallResultSizeScan() {
|
||||
return new Scan().setMaxResultSize(1);
|
||||
}
|
||||
|
||||
@Parameters(name = "{index}: table={0}, scan={2}")
|
||||
public static List<Object[]> params() {
|
||||
Supplier<AsyncTableBase> rawTable = TestAsyncTableScanAll::getRawTable;
|
||||
Supplier<AsyncTableBase> normalTable = TestAsyncTableScanAll::getTable;
|
||||
Supplier<Scan> normalScan = TestAsyncTableScanAll::createNormalScan;
|
||||
Supplier<Scan> smallResultSizeScan = TestAsyncTableScanAll::createSmallResultSizeScan;
|
||||
return Arrays.asList(new Object[] { "raw", rawTable, "normal", normalScan },
|
||||
new Object[] { "raw", rawTable, "smallResultSize", smallResultSizeScan },
|
||||
new Object[] { "normal", normalTable, "normal", normalScan },
|
||||
new Object[] { "normal", normalTable, "smallResultSize", smallResultSizeScan });
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithLimit() throws InterruptedException, ExecutionException {
|
||||
int start = 111;
|
||||
int stop = 888;
|
||||
int limit = 300;
|
||||
List<Result> results = getTable.get()
|
||||
.scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
|
||||
.setReadType(ReadType.PREAD))
|
||||
.get();
|
||||
assertEquals(limit, results.size());
|
||||
IntStream.range(0, limit).forEach(i -> {
|
||||
Result result = results.get(i);
|
||||
int actualIndex = start + i;
|
||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversedScanWithLimit() throws InterruptedException, ExecutionException {
|
||||
int start = 888;
|
||||
int stop = 111;
|
||||
int limit = 300;
|
||||
List<Result> results = getTable.get()
|
||||
.scanAll(scanCreator.get().withStartRow(Bytes.toBytes(String.format("%03d", start)))
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop))).setLimit(limit)
|
||||
.setReadType(ReadType.PREAD).setReversed(true))
|
||||
.get();
|
||||
assertEquals(limit, results.size());
|
||||
IntStream.range(0, limit).forEach(i -> {
|
||||
Result result = results.get(i);
|
||||
int actualIndex = start - i;
|
||||
assertEquals(String.format("%03d", actualIndex), Bytes.toString(result.getRow()));
|
||||
assertEquals(actualIndex, Bytes.toInt(result.getValue(FAMILY, CQ1)));
|
||||
});
|
||||
return getScanCreater().stream()
|
||||
.flatMap(p -> Arrays.asList(new Object[] { "raw", rawTable, p.getFirst(), p.getSecond() },
|
||||
new Object[] { "normal", normalTable, p.getFirst(), p.getSecond() }).stream())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -127,6 +72,10 @@ public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan {
|
|||
|
||||
@Override
|
||||
protected List<Result> doScan(Scan scan) throws Exception {
|
||||
return getTable.get().scanAll(scan).get();
|
||||
List<Result> results = getTable.get().scanAll(scan).get();
|
||||
if (scan.getBatch() > 0) {
|
||||
results = convertFromBatchResult(results);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -35,32 +35,16 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
@Category({ LargeTests.class, ClientTests.class })
|
||||
public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
|
||||
|
||||
@Parameter
|
||||
@Parameter(0)
|
||||
public String scanType;
|
||||
|
||||
@Parameter(1)
|
||||
public Supplier<Scan> scanCreater;
|
||||
|
||||
@Parameters
|
||||
@Parameters(name = "{index}: scan={0}")
|
||||
public static List<Object[]> params() {
|
||||
return Arrays.asList(new Supplier<?>[] { TestAsyncTableScanner::createNormalScan },
|
||||
new Supplier<?>[] { TestAsyncTableScanner::createBatchScan },
|
||||
new Supplier<?>[] { TestAsyncTableScanner::createSmallResultSizeScan },
|
||||
new Supplier<?>[] { TestAsyncTableScanner::createBatchSmallResultSizeScan });
|
||||
}
|
||||
|
||||
private static Scan createNormalScan() {
|
||||
return new Scan();
|
||||
}
|
||||
|
||||
private static Scan createBatchScan() {
|
||||
return new Scan().setBatch(1);
|
||||
}
|
||||
|
||||
// set a small result size for testing flow control
|
||||
private static Scan createSmallResultSizeScan() {
|
||||
return new Scan().setMaxResultSize(1);
|
||||
}
|
||||
|
||||
private static Scan createBatchSmallResultSizeScan() {
|
||||
return new Scan().setBatch(1).setMaxResultSize(1);
|
||||
return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,10 +22,10 @@ import com.google.common.base.Throwables;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -94,17 +94,8 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
|
|||
|
||||
@Parameters(name = "{index}: type={0}")
|
||||
public static List<Object[]> params() {
|
||||
Supplier<Scan> normal = TestRawAsyncTableScan::createNormalScan;
|
||||
Supplier<Scan> batch = TestRawAsyncTableScan::createBatchScan;
|
||||
return Arrays.asList(new Object[] { "normal", normal }, new Object[] { "batch", batch });
|
||||
}
|
||||
|
||||
private static Scan createNormalScan() {
|
||||
return new Scan();
|
||||
}
|
||||
|
||||
private static Scan createBatchScan() {
|
||||
return new Scan().setBatch(1);
|
||||
return getScanCreater().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.client;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -766,74 +765,4 @@ public class TestScannersFromClientSide {
|
|||
|
||||
assertEquals(expKvList.size(), result.size());
|
||||
}
|
||||
|
||||
private void assertResultEquals(Result result, int i) {
|
||||
assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
|
||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
|
||||
}
|
||||
|
||||
private void testStartRowStopRowInclusive(Table table, int start, boolean startInclusive,
|
||||
int stop, boolean stopInclusive) throws IOException {
|
||||
int actualStart = startInclusive ? start : start + 1;
|
||||
int actualStop = stopInclusive ? stop + 1 : stop;
|
||||
int expectedCount = actualStop - actualStart;
|
||||
Result[] results;
|
||||
try (ResultScanner scanner = table.getScanner(
|
||||
new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive)
|
||||
.withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive))) {
|
||||
results = scanner.next(expectedCount);
|
||||
}
|
||||
assertEquals(expectedCount, results.length);
|
||||
for (int i = 0; i < expectedCount; i++) {
|
||||
assertResultEquals(results[i], actualStart + i);
|
||||
}
|
||||
}
|
||||
|
||||
private void testReversedStartRowStopRowInclusive(Table table, int start, boolean startInclusive,
|
||||
int stop, boolean stopInclusive) throws IOException {
|
||||
int actualStart = startInclusive ? start : start - 1;
|
||||
int actualStop = stopInclusive ? stop - 1 : stop;
|
||||
int expectedCount = actualStart - actualStop;
|
||||
Result[] results;
|
||||
try (ResultScanner scanner = table.getScanner(
|
||||
new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive)
|
||||
.withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive)
|
||||
.setReversed(true))) {
|
||||
results = scanner.next(expectedCount);
|
||||
}
|
||||
assertEquals(expectedCount, results.length);
|
||||
for (int i = 0; i < expectedCount; i++) {
|
||||
assertResultEquals(results[i], actualStart - i);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartRowStopRowInclusive() throws IOException, InterruptedException {
|
||||
TableName tableName = TableName.valueOf("testStartRowStopRowInclusive");
|
||||
byte[][] splitKeys = new byte[8][];
|
||||
for (int i = 11; i < 99; i += 11) {
|
||||
splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i));
|
||||
}
|
||||
Table table = TEST_UTIL.createTable(tableName, FAMILY, splitKeys);
|
||||
TEST_UTIL.waitTableAvailable(tableName);
|
||||
try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(tableName)) {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
mutator.mutate(new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, QUALIFIER,
|
||||
Bytes.toBytes(i)));
|
||||
}
|
||||
}
|
||||
// from first region to last region
|
||||
testStartRowStopRowInclusive(table, 1, true, 98, false);
|
||||
testStartRowStopRowInclusive(table, 12, true, 34, true);
|
||||
testStartRowStopRowInclusive(table, 23, true, 45, false);
|
||||
testStartRowStopRowInclusive(table, 34, false, 56, true);
|
||||
testStartRowStopRowInclusive(table, 45, false, 67, false);
|
||||
|
||||
// from last region to first region
|
||||
testReversedStartRowStopRowInclusive(table, 98, true, 1, false);
|
||||
testReversedStartRowStopRowInclusive(table, 54, true, 32, true);
|
||||
testReversedStartRowStopRowInclusive(table, 65, true, 43, false);
|
||||
testReversedStartRowStopRowInclusive(table, 76, false, 54, true);
|
||||
testReversedStartRowStopRowInclusive(table, 87, false, 65, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,254 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
/**
|
||||
* Testcase for newly added feature in HBASE-17143, such as startRow and stopRow
|
||||
* inclusive/exclusive, limit for rows, etc.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestScannersFromClientSide2 {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("scan");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private static byte[] CQ1 = Bytes.toBytes("cq1");
|
||||
|
||||
private static byte[] CQ2 = Bytes.toBytes("cq2");
|
||||
|
||||
@Parameter(0)
|
||||
public boolean batch;
|
||||
|
||||
@Parameter(1)
|
||||
public boolean smallResultSize;
|
||||
|
||||
@Parameter(2)
|
||||
public boolean allowPartial;
|
||||
|
||||
@Parameters(name = "{index}: batch={0}, smallResultSize={1}, allowPartial={2}")
|
||||
public static List<Object[]> params() {
|
||||
List<Object[]> params = new ArrayList<>();
|
||||
boolean[] values = new boolean[] { false, true };
|
||||
for (int i = 0; i < 2; i++) {
|
||||
for (int j = 0; j < 2; j++) {
|
||||
for (int k = 0; k < 2; k++) {
|
||||
params.add(new Object[] { values[i], values[j], values[k] });
|
||||
}
|
||||
}
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
byte[][] splitKeys = new byte[8][];
|
||||
for (int i = 111; i < 999; i += 111) {
|
||||
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
|
||||
}
|
||||
Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
puts.add(new Put(Bytes.toBytes(String.format("%03d", i)))
|
||||
.addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)));
|
||||
}
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
table.put(puts);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private Scan createScan() {
|
||||
Scan scan = new Scan();
|
||||
if (batch) {
|
||||
scan.setBatch(1);
|
||||
}
|
||||
if (smallResultSize) {
|
||||
scan.setMaxResultSize(1);
|
||||
}
|
||||
if (allowPartial) {
|
||||
scan.setAllowPartialResults(true);
|
||||
}
|
||||
return scan;
|
||||
}
|
||||
|
||||
private void assertResultEquals(Result result, int i) {
|
||||
assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
|
||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
|
||||
assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
|
||||
}
|
||||
|
||||
private List<Result> doScan(Scan scan) throws IOException {
|
||||
List<Result> results = new ArrayList<>();
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (Result r; (r = scanner.next()) != null;) {
|
||||
results.add(r);
|
||||
}
|
||||
}
|
||||
return assertAndCreateCompleteResults(results);
|
||||
}
|
||||
|
||||
private List<Result> assertAndCreateCompleteResults(List<Result> results) throws IOException {
|
||||
if ((!batch && !allowPartial) || (allowPartial && !batch && !smallResultSize)) {
|
||||
for (Result result : results) {
|
||||
assertFalse("Should not have partial result", result.hasMoreCellsInRow());
|
||||
}
|
||||
return results;
|
||||
}
|
||||
List<Result> completeResults = new ArrayList<>();
|
||||
List<Result> partialResults = new ArrayList<>();
|
||||
for (Result result : results) {
|
||||
if (!result.hasMoreCellsInRow()) {
|
||||
assertFalse("Should have partial result", partialResults.isEmpty());
|
||||
partialResults.add(result);
|
||||
completeResults.add(Result.createCompleteResult(partialResults));
|
||||
partialResults.clear();
|
||||
} else {
|
||||
partialResults.add(result);
|
||||
}
|
||||
}
|
||||
assertTrue("Should not have orphan partial result", partialResults.isEmpty());
|
||||
return completeResults;
|
||||
}
|
||||
|
||||
private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
|
||||
int limit) throws Exception {
|
||||
Scan scan =
|
||||
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
|
||||
if (limit > 0) {
|
||||
scan.setLimit(limit);
|
||||
}
|
||||
List<Result> results = doScan(scan);
|
||||
int actualStart = startInclusive ? start : start + 1;
|
||||
int actualStop = stopInclusive ? stop + 1 : stop;
|
||||
int count = actualStop - actualStart;
|
||||
if (limit > 0) {
|
||||
count = Math.min(count, limit);
|
||||
}
|
||||
assertEquals(count, results.size());
|
||||
for (int i = 0; i < count; i++) {
|
||||
assertResultEquals(results.get(i), actualStart + i);
|
||||
}
|
||||
}
|
||||
|
||||
private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
|
||||
int limit) throws Exception {
|
||||
Scan scan = createScan()
|
||||
.withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
|
||||
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
|
||||
if (limit > 0) {
|
||||
scan.setLimit(limit);
|
||||
}
|
||||
List<Result> results = doScan(scan);
|
||||
int actualStart = startInclusive ? start : start - 1;
|
||||
int actualStop = stopInclusive ? stop - 1 : stop;
|
||||
int count = actualStart - actualStop;
|
||||
if (limit > 0) {
|
||||
count = Math.min(count, limit);
|
||||
}
|
||||
assertEquals(count, results.size());
|
||||
for (int i = 0; i < count; i++) {
|
||||
assertResultEquals(results.get(i), actualStart - i);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithLimit() throws Exception {
|
||||
testScan(1, true, 998, false, 900); // from first region to last region
|
||||
testScan(123, true, 345, true, 100);
|
||||
testScan(234, true, 456, false, 100);
|
||||
testScan(345, false, 567, true, 100);
|
||||
testScan(456, false, 678, false, 100);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanWithLimitGreaterThanActualCount() throws Exception {
|
||||
testScan(1, true, 998, false, 1000); // from first region to last region
|
||||
testScan(123, true, 345, true, 200);
|
||||
testScan(234, true, 456, false, 200);
|
||||
testScan(345, false, 567, true, 200);
|
||||
testScan(456, false, 678, false, 200);
|
||||
}
|
||||
|
||||
public void testReversedScanWithLimit() throws Exception {
|
||||
testReversedScan(998, true, 1, false, 900); // from last region to first region
|
||||
testReversedScan(543, true, 321, true, 100);
|
||||
testReversedScan(654, true, 432, false, 100);
|
||||
testReversedScan(765, false, 543, true, 100);
|
||||
testReversedScan(876, false, 654, false, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
|
||||
testReversedScan(998, true, 1, false, 1000); // from last region to first region
|
||||
testReversedScan(543, true, 321, true, 200);
|
||||
testReversedScan(654, true, 432, false, 200);
|
||||
testReversedScan(765, false, 543, true, 200);
|
||||
testReversedScan(876, false, 654, false, 200);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartRowStopRowInclusive() throws Exception {
|
||||
testScan(1, true, 998, false, -1); // from first region to last region
|
||||
testScan(123, true, 345, true, -1);
|
||||
testScan(234, true, 456, false, -1);
|
||||
testScan(345, false, 567, true, -1);
|
||||
testScan(456, false, 678, false, -1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReversedStartRowStopRowInclusive() throws Exception {
|
||||
testReversedScan(998, true, 1, false, -1); // from last region to first region
|
||||
testReversedScan(543, true, 321, true, -1);
|
||||
testReversedScan(654, true, 432, false, -1);
|
||||
testReversedScan(765, false, 543, true, -1);
|
||||
testReversedScan(876, false, 654, false, -1);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue