From 849ab5ff2998192d4f21d49f8356cc9a4370743a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 23 Mar 2017 15:47:26 +0800 Subject: [PATCH] HBASE-17595 addendum fix the problem for mayHaveMoreCellsInRow --- .../client/AllowPartialScanResultCache.java | 36 +++- .../hbase/client/BatchScanResultCache.java | 41 +++- .../hadoop/hbase/client/ClientScanner.java | 17 +- .../hbase/client/CompleteScanResultCache.java | 24 ++- .../hadoop/hbase/client/ConnectionUtils.java | 24 --- .../org/apache/hadoop/hbase/client/Scan.java | 2 - .../hadoop/hbase/client/ScanResultCache.java | 7 +- .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 106 ++++++++--- .../hbase/regionserver/ScannerContext.java | 2 +- .../hbase/client/ColumnCountOnRowFilter.java | 58 ++++++ .../client/TestLimitedScanWithFilter.java | 177 ++++++++++++++++++ 12 files changed, 415 insertions(+), 81 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java index 82f1ea0c624..5b6c411dfe2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -38,13 +39,23 @@ class AllowPartialScanResultCache implements ScanResultCache { // beginning of a row when retry. private Cell lastCell; - private void updateLastCell(Result result) { + private boolean lastResultPartial; + + private int numberOfCompleteRows; + + private void recordLastResult(Result result) { lastCell = result.rawCells()[result.rawCells().length - 1]; + lastResultPartial = result.mayHaveMoreCellsInRow(); } @Override public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { if (results.length == 0) { + if (!isHeartbeatMessage && lastResultPartial) { + // An empty non heartbeat result indicate that there must be a row change. So if the + // lastResultPartial is true then we need to increase numberOfCompleteRows. + numberOfCompleteRows++; + } return EMPTY_RESULT_ARRAY; } int i; @@ -58,16 +69,29 @@ class AllowPartialScanResultCache implements ScanResultCache { if (i == results.length) { return EMPTY_RESULT_ARRAY; } - updateLastCell(results[results.length - 1]); - if (i > 0) { - return Arrays.copyOfRange(results, i, results.length); - } else { - return results; + if (lastResultPartial && !CellUtil.matchingRow(lastCell, results[0].getRow())) { + // there is a row change, so increase numberOfCompleteRows + numberOfCompleteRows++; } + recordLastResult(results[results.length - 1]); + if (i > 0) { + results = Arrays.copyOfRange(results, i, results.length); + } + for (Result result : results) { + if (!result.mayHaveMoreCellsInRow()) { + numberOfCompleteRows++; + } + } + return results; } @Override public void clear() { // we do not cache anything } + + @Override + public int numberOfCompleteRows() { + return numberOfCompleteRows; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java index 9ab959b395c..293f4117dc0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java @@ -26,6 +26,7 @@ import java.util.Deque; import java.util.List; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -45,19 +46,25 @@ public class BatchScanResultCache implements ScanResultCache { // beginning of a row when retry. private Cell lastCell; + private boolean lastResultPartial; + private final Deque partialResults = new ArrayDeque<>(); private int numCellsOfPartialResults; + private int numberOfCompleteRows; + public BatchScanResultCache(int batch) { this.batch = batch; } - private void updateLastCell(Result result) { + private void recordLastResult(Result result) { lastCell = result.rawCells()[result.rawCells().length - 1]; + lastResultPartial = result.mayHaveMoreCellsInRow(); } private Result createCompletedResult() throws IOException { + numberOfCompleteRows++; Result result = Result.createCompleteResult(partialResults); partialResults.clear(); numCellsOfPartialResults = 0; @@ -104,8 +111,15 @@ public class BatchScanResultCache implements ScanResultCache { @Override public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { if (results.length == 0) { - if (!partialResults.isEmpty() && !isHeartbeatMessage) { - return new Result[] { createCompletedResult() }; + if (!isHeartbeatMessage) { + if (!partialResults.isEmpty()) { + return new Result[] { createCompletedResult() }; + } + if (lastResultPartial) { + // An empty non heartbeat result indicate that there must be a row change. So if the + // lastResultPartial is true then we need to increase numberOfCompleteRows. + numberOfCompleteRows++; + } } return EMPTY_RESULT_ARRAY; } @@ -115,6 +129,17 @@ public class BatchScanResultCache implements ScanResultCache { if (result == null) { continue; } + if (!partialResults.isEmpty()) { + if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) { + // there is a row change + regroupedResults.add(createCompletedResult()); + } + } else if (lastResultPartial && !CellUtil.matchingRow(lastCell, result.getRow())) { + // As for batched scan we may return partial results to user if we reach the batch limit, so + // here we need to use lastCell to determine if there is row change and increase + // numberOfCompleteRows. + numberOfCompleteRows++; + } // check if we have a row change if (!partialResults.isEmpty() && !Bytes.equals(partialResults.peek().getRow(), result.getRow())) { @@ -122,9 +147,12 @@ public class BatchScanResultCache implements ScanResultCache { } Result regroupedResult = regroupResults(result); if (regroupedResult != null) { + if (!regroupedResult.mayHaveMoreCellsInRow()) { + numberOfCompleteRows++; + } regroupedResults.add(regroupedResult); // only update last cell when we actually return it to user. - updateLastCell(regroupedResult); + recordLastResult(regroupedResult); } if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) { // We are done for this row @@ -139,4 +167,9 @@ public class BatchScanResultCache implements ScanResultCache { partialResults.clear(); numCellsOfPartialResults = 0; } + + @Override + public int numberOfCompleteRows() { + return numberOfCompleteRows; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index abcb67ea070..8e94c7c8ad2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -18,13 +18,11 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; -import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Arrays; import java.util.LinkedList; import java.util.concurrent.ExecutorService; @@ -465,8 +463,11 @@ public abstract class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. + int numberOfCompleteRowsBefore = scanResultCache.numberOfCompleteRows(); Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); + int numberOfCompleteRows = + scanResultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; if (resultsToAddToCache.length > 0) { for (Result rs : resultsToAddToCache) { cache.add(rs); @@ -476,12 +477,12 @@ public abstract class ClientScanner extends AbstractClientScanner { countdown--; this.lastResult = rs; } - if (scan.getLimit() > 0) { - int newLimit = - scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache)); - assert newLimit >= 0; - scan.setLimit(newLimit); - } + } + + if (scan.getLimit() > 0) { + int newLimit = scan.getLimit() - numberOfCompleteRows; + assert newLimit >= 0; + scan.setLimit(newLimit); } if (scanExhausted(values)) { closeScanner(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java index e09ddfb7058..a132642d8b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Private class CompleteScanResultCache implements ScanResultCache { + private int numberOfCompleteRows; + private final List partialResults = new ArrayList<>(); private Result combine() throws IOException { @@ -59,6 +61,11 @@ class CompleteScanResultCache implements ScanResultCache { return prependResults; } + private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) { + numberOfCompleteRows += results.length; + return results; + } + @Override public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { // If no results were returned it indicates that either we have the all the partial results @@ -69,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache { // and thus there may be more partials server side that still need to be added to the partial // list before we form the complete Result if (!partialResults.isEmpty() && !isHeartbeatMessage) { - return new Result[] { combine() }; + return updateNumberOfCompleteResultsAndReturn(combine()); } return EMPTY_RESULT_ARRAY; } @@ -79,7 +86,7 @@ class CompleteScanResultCache implements ScanResultCache { if (last.mayHaveMoreCellsInRow()) { if (partialResults.isEmpty()) { partialResults.add(last); - return Arrays.copyOf(results, results.length - 1); + return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1)); } // We have only one result and it is partial if (results.length == 1) { @@ -90,21 +97,26 @@ class CompleteScanResultCache implements ScanResultCache { } Result completeResult = combine(); partialResults.add(last); - return new Result[] { completeResult }; + return updateNumberOfCompleteResultsAndReturn(completeResult); } // We have some complete results Result[] resultsToReturn = prependCombined(results, results.length - 1); partialResults.add(last); - return resultsToReturn; + return updateNumberOfCompleteResultsAndReturn(resultsToReturn); } if (!partialResults.isEmpty()) { - return prependCombined(results, results.length); + return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length)); } - return results; + return updateNumberOfCompleteResultsAndReturn(results); } @Override public void clear() { partialResults.clear(); } + + @Override + public int numberOfCompleteRows() { + return numberOfCompleteRows; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 1bdc5fec82f..71556597b26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; @@ -303,29 +302,6 @@ public class ConnectionUtils { return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; } - /** - * Count the individual rows for the given result list. - *

- * There are two reason why we need to use this method instead of a simple {@code results.length}. - *

    - *
  1. Server may return only part of the whole cells of a row for the last result, and if - * allowPartial is true, we will return the array to user directly. We should not count the last - * result.
  2. - *
  3. If this is a batched scan, a row may be split into several results, but they should be - * counted as one row. For example, a row with 15 cells will be split into 3 results with 5 cells - * each if {@code scan.getBatch()} is 5.
  4. - *
- */ - public static int numberOfIndividualRows(List results) { - int count = 0; - for (Result result : results) { - if (!result.mayHaveMoreCellsInRow()) { - count++; - } - } - return count; - } - public static ScanResultCache createScanResultCache(Scan scan) { if (scan.getAllowPartialResults()) { return new AllowPartialScanResultCache(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 9ce40e85844..26076f5d4c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -1149,8 +1149,6 @@ public class Scan extends Query { * reaches this value. *

* This condition will be tested at last, after all other conditions such as stopRow, filter, etc. - *

- * Can not be used together with batch and allowPartial. * @param limit the limit of rows for this scan * @return this */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java index 2366b5788eb..2d28e1a5d26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultCache.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; *

    *
  1. Get results from ScanResponse proto.
  2. *
  3. Pass them to ScanResultCache and get something back.
  4. - *
  5. If we actually get something back, then pass it to ScanObserver.
  6. + *
  7. If we actually get something back, then pass it to ScanConsumer.
  8. *
*/ @InterfaceAudience.Private @@ -50,4 +50,9 @@ interface ScanResultCache { * again. */ void clear(); + + /** + * Return the number of complete rows. Used to implement limited scan. + */ + int numberOfCompleteRows(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2ba1ce95136..e7e65a6a046 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6076,7 +6076,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows - if (!scannerContext.hasMoreCellsInRow()) { + if (!scannerContext.mayHaveMoreCellsInRow()) { resetFilters(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 49ce348ac36..27f54205a7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -274,6 +274,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final String scannerName; private final RegionScanner s; private final Region r; + private byte[] rowOfLastPartialResult; public RegionScannerHolder(String scannerName, RegionScanner s, Region r) { this.scannerName = scannerName; @@ -2558,9 +2559,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return -1L; } + private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows, + ScannerContext scannerContext, ScanResponse.Builder builder) { + if (numOfCompleteRows >= limitOfRows) { + if (LOG.isTraceEnabled()) { + LOG.trace("Done scanning, limit of rows reached, moreRows: " + moreRows + + " scannerContext: " + scannerContext); + } + builder.setMoreResults(false); + } + } + // return whether we have more results in region. - private boolean scan(HBaseRpcController controller, ScanRequest request, - RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, List results, + private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, + long maxQuotaResultSize, int maxResults, int limitOfRows, List results, ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context) throws IOException { Region region = rsh.r; @@ -2577,7 +2589,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, List values = new ArrayList(32); region.startRegionOperation(Operation.SCAN); try { - int i = 0; + int numOfResults = 0; + int numOfCompleteRows = 0; long before = EnvironmentEdgeManager.currentTime(); synchronized (scanner) { boolean stale = (region.getRegionInfo().getReplicaId() != 0); @@ -2622,7 +2635,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); boolean limitReached = false; - while (i < maxResults) { + while (numOfResults < maxResults) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The // batch limit is a limit on the number of cells per Result. Thus, if progress is // being tracked (i.e. scannerContext.keepProgress() is true) then we need to @@ -2634,16 +2647,46 @@ public class RSRpcServices implements HBaseRPCErrorHandler, moreRows = scanner.nextRaw(values, scannerContext); if (!values.isEmpty()) { - Result r = Result.create(values, null, stale, scannerContext.hasMoreCellsInRow()); + if (limitOfRows > 0) { + // First we need to check if the last result is partial and we have a row change. If + // so then we need to increase the numOfCompleteRows. + if (results.isEmpty()) { + if (rsh.rowOfLastPartialResult != null && + !CellUtil.matchingRow(values.get(0), rsh.rowOfLastPartialResult)) { + numOfCompleteRows++; + checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, + builder); + } + } else { + Result lastResult = results.get(results.size() - 1); + if (lastResult.mayHaveMoreCellsInRow() && + !CellUtil.matchingRow(values.get(0), lastResult.getRow())) { + numOfCompleteRows++; + checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, + builder); + } + } + if (builder.hasMoreResults() && !builder.getMoreResults()) { + break; + } + } + boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); + Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); lastBlock.setValue(addSize(context, r, lastBlock.getValue())); results.add(r); - i++; + numOfResults++; + if (!mayHaveMoreCellsInRow && limitOfRows > 0) { + numOfCompleteRows++; + checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder); + if (builder.hasMoreResults() && !builder.getMoreResults()) { + break; + } + } } - boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); - boolean rowLimitReached = i >= maxResults; - limitReached = sizeLimitReached || timeLimitReached || rowLimitReached; + boolean resultsLimitReached = numOfResults >= maxResults; + limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; if (limitReached || !moreRows) { if (LOG.isTraceEnabled()) { @@ -2669,7 +2712,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // We didn't get a single batch builder.setMoreResultsInRegion(false); } - // Check to see if the client requested that we track metrics server side. If the // client requested metrics, retrieve the metrics from the scanner context. if (trackMetrics) { @@ -2686,7 +2728,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.setScanMetrics(metricBuilder.build()); } } - region.updateReadRequestsCount(i); + region.updateReadRequestsCount(numOfResults); long end = EnvironmentEdgeManager.currentTime(); long responseCellSize = context != null ? context.getResponseCellSize() : 0; region.getMetrics().updateScanTime(end - before); @@ -2701,7 +2743,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true); } - return builder.getMoreResultsInRegion(); } /** @@ -2809,14 +2850,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // now let's do the real scan. long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); RegionScanner scanner = rsh.s; - boolean moreResults = true; - boolean moreResultsInRegion = true; // this is the limit of rows for this scan, if we the number of rows reach this value, we will // close the scanner. int limitOfRows; if (request.hasLimitOfRows()) { limitOfRows = request.getLimitOfRows(); - rows = Math.min(rows, limitOfRows); } else { limitOfRows = -1; } @@ -2839,32 +2877,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } if (!done) { - moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, - maxQuotaResultSize, rows, results, builder, lastBlock, context); + scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, + results, builder, lastBlock, context); } } quota.addScanResult(results); - + addResults(builder, results, (HBaseRpcController) controller, + RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); if (scanner.isFilterDone() && results.isEmpty()) { // If the scanner's filter - if any - is done with the scan // only set moreResults to false if the results is empty. This is used to keep compatible // with the old scan implementation where we just ignore the returned results if moreResults // is false. Can remove the isEmpty check after we get rid of the old implementation. - moreResults = false; - } else if (limitOfRows > 0 && !results.isEmpty() && - !results.get(results.size() - 1).mayHaveMoreCellsInRow() && - ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) { - // if we have reached the limit of rows - moreResults = false; + builder.setMoreResults(false); } - addResults(builder, results, (HBaseRpcController) controller, - RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); - if (!moreResults || !moreResultsInRegion || closeScanner) { + // we only set moreResults to false in the above code, so set it to true if we haven't set it + // yet. + if (!builder.hasMoreResults()) { + builder.setMoreResults(true); + } + if (builder.getMoreResults() && builder.getMoreResultsInRegion() && !results.isEmpty()) { + // Record the last cell of the last result if it is a partial result + // We need this to calculate the complete rows we have returned to client as the + // mayHaveMoreCellsInRow is true does not mean that there will be extra cells for the + // current row. We may filter out all the remaining cells for the current row and just + // return the cells of the nextRow when calling RegionScanner.nextRaw. So here we need to + // check for row change. + Result lastResult = results.get(results.size() - 1); + if (lastResult.mayHaveMoreCellsInRow()) { + rsh.rowOfLastPartialResult = lastResult.getRow(); + } else { + rsh.rowOfLastPartialResult = null; + } + } + if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { scannerClosed = true; closeScanner(region, scanner, scannerName, context); } - builder.setMoreResults(moreResults); return builder.build(); } catch (Exception e) { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 67b26932e0a..2c5fd0192f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -221,7 +221,7 @@ public class ScannerContext { * @return true when we have more cells for the current row. This usually because we have reached * a limit in the middle of a row */ - boolean hasMoreCellsInRow() { + boolean mayHaveMoreCellsInRow() { return scannerState == NextState.SIZE_LIMIT_REACHED_MID_ROW || scannerState == NextState.TIME_LIMIT_REACHED_MID_ROW || scannerState == NextState.BATCH_LIMIT_REACHED; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java new file mode 100644 index 00000000000..c4b4d2879bd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ColumnCountOnRowFilter.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; + +@InterfaceAudience.Private +public final class ColumnCountOnRowFilter extends FilterBase { + + private final int limit; + + private int count = 0; + + public ColumnCountOnRowFilter(int limit) { + this.limit = limit; + } + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + count++; + return count > limit ? ReturnCode.NEXT_ROW : ReturnCode.INCLUDE; + } + + @Override + public void reset() throws IOException { + this.count = 0; + } + + @Override + public byte[] toByteArray() throws IOException { + return Bytes.toBytes(limit); + } + + public static ColumnCountOnRowFilter parseFrom(byte[] bytes) throws DeserializationException { + return new ColumnCountOnRowFilter(Bytes.toInt(bytes)); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java new file mode 100644 index 00000000000..f702e3d55db --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLimitedScanWithFilter.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * With filter we may stop at a middle of row and think that we still have more cells for the + * current row but actually all the remaining cells will be filtered out by the filter. So it will + * lead to a Result that mayHaveMoreCellsInRow is true but actually there are no cells for the same + * row. Here we want to test if our limited scan still works. + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestLimitedScanWithFilter { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("TestRegionScanner"); + + private static final byte[] FAMILY = Bytes.toBytes("cf"); + + private static final byte[][] CQS = + { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3"), Bytes.toBytes("cq4") }; + + private static int ROW_COUNT = 10; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { + for (int i = 0; i < ROW_COUNT; i++) { + Put put = new Put(Bytes.toBytes(i)); + for (int j = 0; j < CQS.length; j++) { + put.addColumn(FAMILY, CQS[j], Bytes.toBytes((j + 1) * i)); + } + table.put(put); + } + } + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testCompleteResult() throws IOException { + int limit = 5; + Scan scan = + new Scan().setFilter(new ColumnCountOnRowFilter(2)).setMaxResultSize(1).setLimit(limit); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME); + ResultScanner scanner = table.getScanner(scan)) { + for (int i = 0; i < limit; i++) { + Result result = scanner.next(); + assertEquals(i, Bytes.toInt(result.getRow())); + assertEquals(2, result.size()); + assertFalse(result.mayHaveMoreCellsInRow()); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0]))); + assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1]))); + } + assertNull(scanner.next()); + } + } + + @Test + public void testAllowPartial() throws IOException { + int limit = 5; + Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(2)).setMaxResultSize(1) + .setAllowPartialResults(true).setLimit(limit); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME); + ResultScanner scanner = table.getScanner(scan)) { + for (int i = 0; i < 2 * limit; i++) { + int key = i / 2; + Result result = scanner.next(); + assertEquals(key, Bytes.toInt(result.getRow())); + assertEquals(1, result.size()); + assertTrue(result.mayHaveMoreCellsInRow()); + int cqIndex = i % 2; + assertEquals(key * (cqIndex + 1), Bytes.toInt(result.getValue(FAMILY, CQS[cqIndex]))); + } + assertNull(scanner.next()); + } + } + + @Test + public void testBatchAllowPartial() throws IOException { + int limit = 5; + Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(3)).setBatch(2).setMaxResultSize(1) + .setAllowPartialResults(true).setLimit(limit); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME); + ResultScanner scanner = table.getScanner(scan)) { + for (int i = 0; i < 3 * limit; i++) { + int key = i / 3; + Result result = scanner.next(); + assertEquals(key, Bytes.toInt(result.getRow())); + assertEquals(1, result.size()); + assertTrue(result.mayHaveMoreCellsInRow()); + int cqIndex = i % 3; + assertEquals(key * (cqIndex + 1), Bytes.toInt(result.getValue(FAMILY, CQS[cqIndex]))); + } + assertNull(scanner.next()); + } + } + + @Test + public void testBatch() throws IOException { + int limit = 5; + Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(2)).setBatch(2).setMaxResultSize(1) + .setLimit(limit); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME); + ResultScanner scanner = table.getScanner(scan)) { + for (int i = 0; i < limit; i++) { + Result result = scanner.next(); + assertEquals(i, Bytes.toInt(result.getRow())); + assertEquals(2, result.size()); + assertTrue(result.mayHaveMoreCellsInRow()); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0]))); + assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1]))); + } + assertNull(scanner.next()); + } + } + + @Test + public void testBatchAndFilterDiffer() throws IOException { + int limit = 5; + Scan scan = new Scan().setFilter(new ColumnCountOnRowFilter(3)).setBatch(2).setMaxResultSize(1) + .setLimit(limit); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME); + ResultScanner scanner = table.getScanner(scan)) { + for (int i = 0; i < limit; i++) { + Result result = scanner.next(); + assertEquals(i, Bytes.toInt(result.getRow())); + assertEquals(2, result.size()); + assertTrue(result.mayHaveMoreCellsInRow()); + assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQS[0]))); + assertEquals(2 * i, Bytes.toInt(result.getValue(FAMILY, CQS[1]))); + result = scanner.next(); + assertEquals(i, Bytes.toInt(result.getRow())); + assertEquals(1, result.size()); + assertFalse(result.mayHaveMoreCellsInRow()); + assertEquals(3 * i, Bytes.toInt(result.getValue(FAMILY, CQS[2]))); + } + assertNull(scanner.next()); + } + } +}