HBASE-17595 addendum fix the problem for mayHaveMoreCellsInRow
This commit is contained in:
parent
9726c71681
commit
849ab5ff29
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -38,13 +39,23 @@ class AllowPartialScanResultCache implements ScanResultCache {
|
|||
// beginning of a row when retry.
|
||||
private Cell lastCell;
|
||||
|
||||
private void updateLastCell(Result result) {
|
||||
private boolean lastResultPartial;
|
||||
|
||||
private int numberOfCompleteRows;
|
||||
|
||||
private void recordLastResult(Result result) {
|
||||
lastCell = result.rawCells()[result.rawCells().length - 1];
|
||||
lastResultPartial = result.mayHaveMoreCellsInRow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
|
||||
if (results.length == 0) {
|
||||
if (!isHeartbeatMessage && lastResultPartial) {
|
||||
// An empty non heartbeat result indicate that there must be a row change. So if the
|
||||
// lastResultPartial is true then we need to increase numberOfCompleteRows.
|
||||
numberOfCompleteRows++;
|
||||
}
|
||||
return EMPTY_RESULT_ARRAY;
|
||||
}
|
||||
int i;
|
||||
|
@ -58,16 +69,29 @@ class AllowPartialScanResultCache implements ScanResultCache {
|
|||
if (i == results.length) {
|
||||
return EMPTY_RESULT_ARRAY;
|
||||
}
|
||||
updateLastCell(results[results.length - 1]);
|
||||
if (i > 0) {
|
||||
return Arrays.copyOfRange(results, i, results.length);
|
||||
} else {
|
||||
return results;
|
||||
if (lastResultPartial && !CellUtil.matchingRow(lastCell, results[0].getRow())) {
|
||||
// there is a row change, so increase numberOfCompleteRows
|
||||
numberOfCompleteRows++;
|
||||
}
|
||||
recordLastResult(results[results.length - 1]);
|
||||
if (i > 0) {
|
||||
results = Arrays.copyOfRange(results, i, results.length);
|
||||
}
|
||||
for (Result result : results) {
|
||||
if (!result.mayHaveMoreCellsInRow()) {
|
||||
numberOfCompleteRows++;
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
// we do not cache anything
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numberOfCompleteRows() {
|
||||
return numberOfCompleteRows;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Deque;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -45,19 +46,25 @@ public class BatchScanResultCache implements ScanResultCache {
|
|||
// beginning of a row when retry.
|
||||
private Cell lastCell;
|
||||
|
||||
private boolean lastResultPartial;
|
||||
|
||||
private final Deque<Result> partialResults = new ArrayDeque<>();
|
||||
|
||||
private int numCellsOfPartialResults;
|
||||
|
||||
private int numberOfCompleteRows;
|
||||
|
||||
public BatchScanResultCache(int batch) {
|
||||
this.batch = batch;
|
||||
}
|
||||
|
||||
private void updateLastCell(Result result) {
|
||||
private void recordLastResult(Result result) {
|
||||
lastCell = result.rawCells()[result.rawCells().length - 1];
|
||||
lastResultPartial = result.mayHaveMoreCellsInRow();
|
||||
}
|
||||
|
||||
private Result createCompletedResult() throws IOException {
|
||||
numberOfCompleteRows++;
|
||||
Result result = Result.createCompleteResult(partialResults);
|
||||
partialResults.clear();
|
||||
numCellsOfPartialResults = 0;
|
||||
|
@ -104,8 +111,15 @@ public class BatchScanResultCache implements ScanResultCache {
|
|||
@Override
|
||||
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
|
||||
if (results.length == 0) {
|
||||
if (!partialResults.isEmpty() && !isHeartbeatMessage) {
|
||||
return new Result[] { createCompletedResult() };
|
||||
if (!isHeartbeatMessage) {
|
||||
if (!partialResults.isEmpty()) {
|
||||
return new Result[] { createCompletedResult() };
|
||||
}
|
||||
if (lastResultPartial) {
|
||||
// An empty non heartbeat result indicate that there must be a row change. So if the
|
||||
// lastResultPartial is true then we need to increase numberOfCompleteRows.
|
||||
numberOfCompleteRows++;
|
||||
}
|
||||
}
|
||||
return EMPTY_RESULT_ARRAY;
|
||||
}
|
||||
|
@ -115,6 +129,17 @@ public class BatchScanResultCache implements ScanResultCache {
|
|||
if (result == null) {
|
||||
continue;
|
||||
}
|
||||
if (!partialResults.isEmpty()) {
|
||||
if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
|
||||
// there is a row change
|
||||
regroupedResults.add(createCompletedResult());
|
||||
}
|
||||
} else if (lastResultPartial && !CellUtil.matchingRow(lastCell, result.getRow())) {
|
||||
// As for batched scan we may return partial results to user if we reach the batch limit, so
|
||||
// here we need to use lastCell to determine if there is row change and increase
|
||||
// numberOfCompleteRows.
|
||||
numberOfCompleteRows++;
|
||||
}
|
||||
// check if we have a row change
|
||||
if (!partialResults.isEmpty() &&
|
||||
!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
|
||||
|
@ -122,9 +147,12 @@ public class BatchScanResultCache implements ScanResultCache {
|
|||
}
|
||||
Result regroupedResult = regroupResults(result);
|
||||
if (regroupedResult != null) {
|
||||
if (!regroupedResult.mayHaveMoreCellsInRow()) {
|
||||
numberOfCompleteRows++;
|
||||
}
|
||||
regroupedResults.add(regroupedResult);
|
||||
// only update last cell when we actually return it to user.
|
||||
updateLastCell(regroupedResult);
|
||||
recordLastResult(regroupedResult);
|
||||
}
|
||||
if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
|
||||
// We are done for this row
|
||||
|
@ -139,4 +167,9 @@ public class BatchScanResultCache implements ScanResultCache {
|
|||
partialResults.clear();
|
||||
numCellsOfPartialResults = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numberOfCompleteRows() {
|
||||
return numberOfCompleteRows;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,13 +18,11 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
|
@ -465,8 +463,11 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
// Groom the array of Results that we received back from the server before adding that
|
||||
// Results to the scanner's cache. If partial results are not allowed to be seen by the
|
||||
// caller, all book keeping will be performed within this method.
|
||||
int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows();
|
||||
Result[] resultsToAddToCache =
|
||||
scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
|
||||
int numberOfCompleteRows =
|
||||
scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
|
||||
if (resultsToAddToCache.length > 0) {
|
||||
for (Result rs : resultsToAddToCache) {
|
||||
cache.add(rs);
|
||||
|
@ -476,12 +477,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
countdown--;
|
||||
this.lastResult = rs;
|
||||
}
|
||||
if (scan.getLimit() > 0) {
|
||||
int newLimit =
|
||||
scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
|
||||
assert newLimit >= 0;
|
||||
scan.setLimit(newLimit);
|
||||
}
|
||||
}
|
||||
|
||||
if (scan.getLimit() > 0) {
|
||||
int newLimit = scan.getLimit() - numberOfCompleteRows;
|
||||
assert newLimit >= 0;
|
||||
scan.setLimit(newLimit);
|
||||
}
|
||||
if (scanExhausted(values)) {
|
||||
closeScanner();
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
@InterfaceAudience.Private
|
||||
class CompleteScanResultCache implements ScanResultCache {
|
||||
|
||||
private int numberOfCompleteRows;
|
||||
|
||||
private final List<Result> partialResults = new ArrayList<>();
|
||||
|
||||
private Result combine() throws IOException {
|
||||
|
@ -59,6 +61,11 @@ class CompleteScanResultCache implements ScanResultCache {
|
|||
return prependResults;
|
||||
}
|
||||
|
||||
private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) {
|
||||
numberOfCompleteRows += results.length;
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
|
||||
// If no results were returned it indicates that either we have the all the partial results
|
||||
|
@ -69,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache {
|
|||
// and thus there may be more partials server side that still need to be added to the partial
|
||||
// list before we form the complete Result
|
||||
if (!partialResults.isEmpty() && !isHeartbeatMessage) {
|
||||
return new Result[] { combine() };
|
||||
return updateNumberOfCompleteResultsAndReturn(combine());
|
||||
}
|
||||
return EMPTY_RESULT_ARRAY;
|
||||
}
|
||||
|
@ -79,7 +86,7 @@ class CompleteScanResultCache implements ScanResultCache {
|
|||
if (last.mayHaveMoreCellsInRow()) {
|
||||
if (partialResults.isEmpty()) {
|
||||
partialResults.add(last);
|
||||
return Arrays.copyOf(results, results.length - 1);
|
||||
return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1));
|
||||
}
|
||||
// We have only one result and it is partial
|
||||
if (results.length == 1) {
|
||||
|
@ -90,21 +97,26 @@ class CompleteScanResultCache implements ScanResultCache {
|
|||
}
|
||||
Result completeResult = combine();
|
||||
partialResults.add(last);
|
||||
return new Result[] { completeResult };
|
||||
return updateNumberOfCompleteResultsAndReturn(completeResult);
|
||||
}
|
||||
// We have some complete results
|
||||
Result[] resultsToReturn = prependCombined(results, results.length - 1);
|
||||
partialResults.add(last);
|
||||
return resultsToReturn;
|
||||
return updateNumberOfCompleteResultsAndReturn(resultsToReturn);
|
||||
}
|
||||
if (!partialResults.isEmpty()) {
|
||||
return prependCombined(results, results.length);
|
||||
return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length));
|
||||
}
|
||||
return results;
|
||||
return updateNumberOfCompleteResultsAndReturn(results);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
partialResults.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int numberOfCompleteRows() {
|
||||
return numberOfCompleteRows;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
@ -303,29 +302,6 @@ public class ConnectionUtils {
|
|||
return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Count the individual rows for the given result list.
|
||||
* <p>
|
||||
* There are two reason why we need to use this method instead of a simple {@code results.length}.
|
||||
* <ol>
|
||||
* <li>Server may return only part of the whole cells of a row for the last result, and if
|
||||
* allowPartial is true, we will return the array to user directly. We should not count the last
|
||||
* result.</li>
|
||||
* <li>If this is a batched scan, a row may be split into several results, but they should be
|
||||
* counted as one row. For example, a row with 15 cells will be split into 3 results with 5 cells
|
||||
* each if {@code scan.getBatch()} is 5.</li>
|
||||
* </ol>
|
||||
*/
|
||||
public static int numberOfIndividualRows(List<Result> results) {
|
||||
int count = 0;
|
||||
for (Result result : results) {
|
||||
if (!result.mayHaveMoreCellsInRow()) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
public static ScanResultCache createScanResultCache(Scan scan) {
|
||||
if (scan.getAllowPartialResults()) {
|
||||
return new AllowPartialScanResultCache();
|
||||
|
|
|
@ -1149,8 +1149,6 @@ public class Scan extends Query {
|
|||
* reaches this value.
|
||||
* <p>
|
||||
* This condition will be tested at last, after all other conditions such as stopRow, filter, etc.
|
||||
* <p>
|
||||
* Can not be used together with batch and allowPartial.
|
||||
* @param limit the limit of rows for this scan
|
||||
* @return this
|
||||
*/
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
* <ol>
|
||||
* <li>Get results from ScanResponse proto.</li>
|
||||
* <li>Pass them to ScanResultCache and get something back.</li>
|
||||
* <li>If we actually get something back, then pass it to ScanObserver.</li>
|
||||
* <li>If we actually get something back, then pass it to ScanConsumer.</li>
|
||||
* </ol>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
|
@ -50,4 +50,9 @@ interface ScanResultCache {
|
|||
* again.
|
||||
*/
|
||||
void clear();
|
||||
|
||||
/**
|
||||
* Return the number of complete rows. Used to implement limited scan.
|
||||
*/
|
||||
int numberOfCompleteRows();
|
||||
}
|
||||
|
|
|
@ -6076,7 +6076,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// If the size limit was reached it means a partial Result is being returned. Returning a
|
||||
// partial Result means that we should not reset the filters; filters should only be reset in
|
||||
// between rows
|
||||
if (!scannerContext.hasMoreCellsInRow()) {
|
||||
if (!scannerContext.mayHaveMoreCellsInRow()) {
|
||||
resetFilters();
|
||||
}
|
||||
|
||||
|
|
|
@ -274,6 +274,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
private final String scannerName;
|
||||
private final RegionScanner s;
|
||||
private final Region r;
|
||||
private byte[] rowOfLastPartialResult;
|
||||
|
||||
public RegionScannerHolder(String scannerName, RegionScanner s, Region r) {
|
||||
this.scannerName = scannerName;
|
||||
|
@ -2558,9 +2559,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return -1L;
|
||||
}
|
||||
|
||||
private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
|
||||
ScannerContext scannerContext, ScanResponse.Builder builder) {
|
||||
if (numOfCompleteRows >= limitOfRows) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows +
|
||||
" scannerContext: " + scannerContext);
|
||||
}
|
||||
builder.setMoreResults(false);
|
||||
}
|
||||
}
|
||||
|
||||
// return whether we have more results in region.
|
||||
private boolean scan(HBaseRpcController controller, ScanRequest request,
|
||||
RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, List<Result> results,
|
||||
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
|
||||
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
|
||||
ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context)
|
||||
throws IOException {
|
||||
Region region = rsh.r;
|
||||
|
@ -2577,7 +2589,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
List<Cell> values = new ArrayList<Cell>(32);
|
||||
region.startRegionOperation(Operation.SCAN);
|
||||
try {
|
||||
int i = 0;
|
||||
int numOfResults = 0;
|
||||
int numOfCompleteRows = 0;
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
synchronized (scanner) {
|
||||
boolean stale = (region.getRegionInfo().getReplicaId() != 0);
|
||||
|
@ -2622,7 +2635,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
contextBuilder.setTrackMetrics(trackMetrics);
|
||||
ScannerContext scannerContext = contextBuilder.build();
|
||||
boolean limitReached = false;
|
||||
while (i < maxResults) {
|
||||
while (numOfResults < maxResults) {
|
||||
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
|
||||
// batch limit is a limit on the number of cells per Result. Thus, if progress is
|
||||
// being tracked (i.e. scannerContext.keepProgress() is true) then we need to
|
||||
|
@ -2634,16 +2647,46 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
moreRows = scanner.nextRaw(values, scannerContext);
|
||||
|
||||
if (!values.isEmpty()) {
|
||||
Result r = Result.create(values, null, stale, scannerContext.hasMoreCellsInRow());
|
||||
if (limitOfRows > 0) {
|
||||
// First we need to check if the last result is partial and we have a row change. If
|
||||
// so then we need to increase the numOfCompleteRows.
|
||||
if (results.isEmpty()) {
|
||||
if (rsh.rowOfLastPartialResult != null &&
|
||||
!CellUtil.matchingRow(values.get(0), rsh.rowOfLastPartialResult)) {
|
||||
numOfCompleteRows++;
|
||||
checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
|
||||
builder);
|
||||
}
|
||||
} else {
|
||||
Result lastResult = results.get(results.size() - 1);
|
||||
if (lastResult.mayHaveMoreCellsInRow() &&
|
||||
!CellUtil.matchingRow(values.get(0), lastResult.getRow())) {
|
||||
numOfCompleteRows++;
|
||||
checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
|
||||
builder);
|
||||
}
|
||||
}
|
||||
if (builder.hasMoreResults() && !builder.getMoreResults()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
|
||||
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
|
||||
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
|
||||
results.add(r);
|
||||
i++;
|
||||
numOfResults++;
|
||||
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
|
||||
numOfCompleteRows++;
|
||||
checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
|
||||
if (builder.hasMoreResults() && !builder.getMoreResults()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
|
||||
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
|
||||
boolean rowLimitReached = i >= maxResults;
|
||||
limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
|
||||
boolean resultsLimitReached = numOfResults >= maxResults;
|
||||
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
|
||||
|
||||
if (limitReached || !moreRows) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -2669,7 +2712,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// We didn't get a single batch
|
||||
builder.setMoreResultsInRegion(false);
|
||||
}
|
||||
|
||||
// Check to see if the client requested that we track metrics server side. If the
|
||||
// client requested metrics, retrieve the metrics from the scanner context.
|
||||
if (trackMetrics) {
|
||||
|
@ -2686,7 +2728,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
builder.setScanMetrics(metricBuilder.build());
|
||||
}
|
||||
}
|
||||
region.updateReadRequestsCount(i);
|
||||
region.updateReadRequestsCount(numOfResults);
|
||||
long end = EnvironmentEdgeManager.currentTime();
|
||||
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
|
||||
region.getMetrics().updateScanTime(end - before);
|
||||
|
@ -2701,7 +2743,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
|
||||
}
|
||||
return builder.getMoreResultsInRegion();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2809,14 +2850,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// now let's do the real scan.
|
||||
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
|
||||
RegionScanner scanner = rsh.s;
|
||||
boolean moreResults = true;
|
||||
boolean moreResultsInRegion = true;
|
||||
// this is the limit of rows for this scan, if we the number of rows reach this value, we will
|
||||
// close the scanner.
|
||||
int limitOfRows;
|
||||
if (request.hasLimitOfRows()) {
|
||||
limitOfRows = request.getLimitOfRows();
|
||||
rows = Math.min(rows, limitOfRows);
|
||||
} else {
|
||||
limitOfRows = -1;
|
||||
}
|
||||
|
@ -2839,32 +2877,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
if (!done) {
|
||||
moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh,
|
||||
maxQuotaResultSize, rows, results, builder, lastBlock, context);
|
||||
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
|
||||
results, builder, lastBlock, context);
|
||||
}
|
||||
}
|
||||
|
||||
quota.addScanResult(results);
|
||||
|
||||
addResults(builder, results, (HBaseRpcController) controller,
|
||||
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
|
||||
if (scanner.isFilterDone() && results.isEmpty()) {
|
||||
// If the scanner's filter - if any - is done with the scan
|
||||
// only set moreResults to false if the results is empty. This is used to keep compatible
|
||||
// with the old scan implementation where we just ignore the returned results if moreResults
|
||||
// is false. Can remove the isEmpty check after we get rid of the old implementation.
|
||||
moreResults = false;
|
||||
} else if (limitOfRows > 0 && !results.isEmpty() &&
|
||||
!results.get(results.size() - 1).mayHaveMoreCellsInRow() &&
|
||||
ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) {
|
||||
// if we have reached the limit of rows
|
||||
moreResults = false;
|
||||
builder.setMoreResults(false);
|
||||
}
|
||||
addResults(builder, results, (HBaseRpcController) controller,
|
||||
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
|
||||
if (!moreResults || !moreResultsInRegion || closeScanner) {
|
||||
// we only set moreResults to false in the above code, so set it to true if we haven't set it
|
||||
// yet.
|
||||
if (!builder.hasMoreResults()) {
|
||||
builder.setMoreResults(true);
|
||||
}
|
||||
if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) {
|
||||
// Record the last cell of the last result if it is a partial result
|
||||
// We need this to calculate the complete rows we have returned to client as the
|
||||
// mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the
|
||||
// current row. We may filter out all the remaining cells for the current row and just
|
||||
// return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to
|
||||
// check for row change.
|
||||
Result lastResult = results.get(results.size() - 1);
|
||||
if (lastResult.mayHaveMoreCellsInRow()) {
|
||||
rsh.rowOfLastPartialResult = lastResult.getRow();
|
||||
} else {
|
||||
rsh.rowOfLastPartialResult = null;
|
||||
}
|
||||
}
|
||||
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
|
||||
scannerClosed = true;
|
||||
closeScanner(region, scanner, scannerName, context);
|
||||
}
|
||||
builder.setMoreResults(moreResults);
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
|
|
|
@ -221,7 +221,7 @@ public class ScannerContext {
|
|||
* @return true when we have more cells for the current row. This usually because we have reached
|
||||
* a limit in the middle of a row
|
||||
*/
|
||||
boolean hasMoreCellsInRow() {
|
||||
boolean mayHaveMoreCellsInRow() {
|
||||
return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW ||
|
||||
scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW ||
|
||||
scannerState == NextState.BATCH_LIMIT_REACHED;
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public final class ColumnCountOnRowFilter extends FilterBase {
|
||||
|
||||
private final int limit;
|
||||
|
||||
private int count = 0;
|
||||
|
||||
public ColumnCountOnRowFilter(int limit) {
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReturnCode filterKeyValue(Cell v) throws IOException {
|
||||
count++;
|
||||
return count > limit ? ReturnCode.NEXT_ROW : ReturnCode.INCLUDE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toByteArray() throws IOException {
|
||||
return Bytes.toBytes(limit);
|
||||
}
|
||||
|
||||
public static ColumnCountOnRowFilter parseFrom(byte[] bytes) throws DeserializationException {
|
||||
return new ColumnCountOnRowFilter(Bytes.toInt(bytes));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,177 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* With filter we may stop at a middle of row and think that we still have more cells for the
|
||||
* current row but actually all the remaining cells will be filtered out by the filter. So it will
|
||||
* lead to a Result that mayHaveMoreCellsInRow is true but actually there are no cells for the same
|
||||
* row. Here we want to test if our limited scan still works.
|
||||
*/
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestLimitedScanWithFilter {
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("TestRegionScanner");
|
||||
|
||||
private static final byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private static final byte[][] CQS =
|
||||
{ Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3"), Bytes.toBytes("cq4") };
|
||||
|
||||
private static int ROW_COUNT = 10;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.startMiniCluster(1);
|
||||
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
|
||||
for (int i = 0; i < ROW_COUNT; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i));
|
||||
for (int j = 0; j < CQS.length; j++) {
|
||||
put.addColumn(FAMILY, CQS[j], Bytes.toBytes((j + 1) * i));
|
||||
}
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompleteResult() throws IOException {
|
||||
int limit = 5;
|
||||
Scan scan =
|
||||
new Scan().setFilter(new ColumnCountOnRowFilter(2)).setMaxResultSize(1).setLimit(limit);
|
||||
try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (int i = 0; i < limit; i++) {
|
||||
Result result = scanner.next();
|
||||
assertEquals(i, Bytes.toInt(result.getRow()));
|
||||
assertEquals(2, result.size());
|
||||
assertFalse(result.mayHaveMoreCellsInRow());
|
||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0])));
|
||||
assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1])));
|
||||
}
|
||||
assertNull(scanner.next());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllowPartial() throws IOException {
|
||||
int limit = 5;
|
||||
Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(2)).setMaxResultSize(1)
|
||||
.setAllowPartialResults(true).setLimit(limit);
|
||||
try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (int i = 0; i < 2 * limit; i++) {
|
||||
int key = i / 2;
|
||||
Result result = scanner.next();
|
||||
assertEquals(key, Bytes.toInt(result.getRow()));
|
||||
assertEquals(1, result.size());
|
||||
assertTrue(result.mayHaveMoreCellsInRow());
|
||||
int cqIndex = i % 2;
|
||||
assertEquals(key * (cqIndex + 1), Bytes.toInt(result.getValue(FAMILY, CQS[cqIndex])));
|
||||
}
|
||||
assertNull(scanner.next());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchAllowPartial() throws IOException {
|
||||
int limit = 5;
|
||||
Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(3)).setBatch(2).setMaxResultSize(1)
|
||||
.setAllowPartialResults(true).setLimit(limit);
|
||||
try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (int i = 0; i < 3 * limit; i++) {
|
||||
int key = i / 3;
|
||||
Result result = scanner.next();
|
||||
assertEquals(key, Bytes.toInt(result.getRow()));
|
||||
assertEquals(1, result.size());
|
||||
assertTrue(result.mayHaveMoreCellsInRow());
|
||||
int cqIndex = i % 3;
|
||||
assertEquals(key * (cqIndex + 1), Bytes.toInt(result.getValue(FAMILY, CQS[cqIndex])));
|
||||
}
|
||||
assertNull(scanner.next());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatch() throws IOException {
|
||||
int limit = 5;
|
||||
Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(2)).setBatch(2).setMaxResultSize(1)
|
||||
.setLimit(limit);
|
||||
try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (int i = 0; i < limit; i++) {
|
||||
Result result = scanner.next();
|
||||
assertEquals(i, Bytes.toInt(result.getRow()));
|
||||
assertEquals(2, result.size());
|
||||
assertTrue(result.mayHaveMoreCellsInRow());
|
||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0])));
|
||||
assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1])));
|
||||
}
|
||||
assertNull(scanner.next());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchAndFilterDiffer() throws IOException {
|
||||
int limit = 5;
|
||||
Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(3)).setBatch(2).setMaxResultSize(1)
|
||||
.setLimit(limit);
|
||||
try (Table table = UTIL.getConnection().getTable(TABLE_NAME);
|
||||
ResultScanner scanner = table.getScanner(scan)) {
|
||||
for (int i = 0; i < limit; i++) {
|
||||
Result result = scanner.next();
|
||||
assertEquals(i, Bytes.toInt(result.getRow()));
|
||||
assertEquals(2, result.size());
|
||||
assertTrue(result.mayHaveMoreCellsInRow());
|
||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0])));
|
||||
assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1])));
|
||||
result = scanner.next();
|
||||
assertEquals(i, Bytes.toInt(result.getRow()));
|
||||
assertEquals(1, result.size());
|
||||
assertFalse(result.mayHaveMoreCellsInRow());
|
||||
assertEquals(3 * i, Bytes.toInt(result.getValue(FAMILY, CQS[2])));
|
||||
}
|
||||
assertNull(scanner.next());
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue