From 1849e8a5a77373b5fb8e354c3f20214a80eb8c1a Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 15 Mar 2017 18:26:51 +0800 Subject: [PATCH] HBASE-17740 Correct the semantic of batch and partial for async client --- .../client/AllowPartialScanResultCache.java | 31 ++- .../hbase/client/AsyncClientScanner.java | 4 +- .../hbase/client/BatchScanResultCache.java | 142 ++++++++++ .../hadoop/hbase/client/ClientScanner.java | 253 +----------------- .../hadoop/hbase/client/ConnectionUtils.java | 14 + .../apache/hadoop/hbase/client/Result.java | 72 ++--- .../TestAllowPartialScanResultCache.java | 33 ++- .../client/TestBatchScanResultCache.java | 113 ++++++++ .../TestCompleteResultScanResultCache.java | 5 +- .../client/TestRawAsyncTablePartialScan.java | 119 ++++++++ 10 files changed, 471 insertions(+), 315 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java index caecfd437cd..82f1ea0c624 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; + import java.io.IOException; import java.util.Arrays; @@ -36,10 +38,6 @@ class AllowPartialScanResultCache implements ScanResultCache { // beginning of a row when retry. private Cell lastCell; - private Result filterCells(Result result) { - return lastCell == null ? result : ConnectionUtils.filterCells(result, lastCell); - } - private void updateLastCell(Result result) { lastCell = result.rawCells()[result.rawCells().length - 1]; } @@ -49,22 +47,23 @@ class AllowPartialScanResultCache implements ScanResultCache { if (results.length == 0) { return EMPTY_RESULT_ARRAY; } - Result first = filterCells(results[0]); - if (results.length == 1) { - if (first == null) { - // do not update last cell if we filter out all cells - return EMPTY_RESULT_ARRAY; + int i; + for (i = 0; i < results.length; i++) { + Result r = filterCells(results[i], lastCell); + if (r != null) { + results[i] = r; + break; } - updateLastCell(results[0]); - results[0] = first; - return results; + } + if (i == results.length) { + return EMPTY_RESULT_ARRAY; } updateLastCell(results[results.length - 1]); - if (first == null) { - return Arrays.copyOfRange(results, 1, results.length); + if (i > 0) { + return Arrays.copyOfRange(results, i, results.length); + } else { + return results; } - results[0] = first; - return results; } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 2215d360db0..fa7aa819e07 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; import java.io.IOException; @@ -86,8 +87,7 @@ class AsyncClientScanner { this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; - this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0 - ? new AllowPartialScanResultCache() : new CompleteScanResultCache(); + this.resultCache = createScanResultCache(scan); } private static final class OpenScannerResponse { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java new file mode 100644 index 00000000000..9ab959b395c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A scan result cache for batched scan, i.e, + * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}. + *

+ * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch + * doesn't mean setAllowPartialResult(true). + */ +@InterfaceAudience.Private +public class BatchScanResultCache implements ScanResultCache { + + private final int batch; + + // used to filter out the cells that already returned to user as we always start from the + // beginning of a row when retry. + private Cell lastCell; + + private final Deque partialResults = new ArrayDeque<>(); + + private int numCellsOfPartialResults; + + public BatchScanResultCache(int batch) { + this.batch = batch; + } + + private void updateLastCell(Result result) { + lastCell = result.rawCells()[result.rawCells().length - 1]; + } + + private Result createCompletedResult() throws IOException { + Result result = Result.createCompleteResult(partialResults); + partialResults.clear(); + numCellsOfPartialResults = 0; + return result; + } + + // Add new result to the partial list and return a batched Result if caching size exceed batching + // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only + // one Result back at most(or null, which means we do not have enough cells). + private Result regroupResults(Result result) { + partialResults.addLast(result); + numCellsOfPartialResults += result.size(); + if (numCellsOfPartialResults < batch) { + return null; + } + Cell[] cells = new Cell[batch]; + int cellCount = 0; + boolean stale = false; + for (;;) { + Result r = partialResults.pollFirst(); + stale = stale || r.isStale(); + int newCellCount = cellCount + r.size(); + if (newCellCount > batch) { + // We have more cells than expected, so split the current result + int len = batch - cellCount; + System.arraycopy(r.rawCells(), 0, cells, cellCount, len); + Cell[] remainingCells = new Cell[r.size() - len]; + System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len); + partialResults.addFirst( + Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow())); + break; + } + System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size()); + if (newCellCount == batch) { + break; + } + cellCount = newCellCount; + } + numCellsOfPartialResults -= batch; + return Result.create(cells, null, stale, + result.mayHaveMoreCellsInRow() || !partialResults.isEmpty()); + } + + @Override + public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { + if (results.length == 0) { + if (!partialResults.isEmpty() && !isHeartbeatMessage) { + return new Result[] { createCompletedResult() }; + } + return EMPTY_RESULT_ARRAY; + } + List regroupedResults = new ArrayList<>(); + for (Result result : results) { + result = filterCells(result, lastCell); + if (result == null) { + continue; + } + // check if we have a row change + if (!partialResults.isEmpty() && + !Bytes.equals(partialResults.peek().getRow(), result.getRow())) { + regroupedResults.add(createCompletedResult()); + } + Result regroupedResult = regroupResults(result); + if (regroupedResult != null) { + regroupedResults.add(regroupedResult); + // only update last cell when we actually return it to user. + updateLastCell(regroupedResult); + } + if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) { + // We are done for this row + regroupedResults.add(createCompletedResult()); + } + } + return regroupedResults.toArray(new Result[0]); + } + + @Override + public void clear() { + partialResults.clear(); + numCellsOfPartialResults = 0; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index bd3d4efcb74..a8b029ff432 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -18,16 +18,15 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.concurrent.ExecutorService; @@ -35,8 +34,6 @@ import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -69,24 +66,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected HRegionInfo currentRegion = null; protected ScannerCallableWithReplicas callable = null; protected Queue cache; - /** - * A list of partial results that have been returned from the server. This list should only - * contain results if this scanner does not have enough partial results to form the complete - * result. - */ - protected int partialResultsCellSizes = 0; - protected final LinkedList partialResults = new LinkedList<>(); - - /** - * The row for which we are accumulating partial Results (i.e. the row of the Results stored - * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via - * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()} - */ - protected byte[] partialResultsRow = null; - /** - * The last cell from a not full Row which is added to cache - */ - protected Cell lastCellLoadedToCache = null; + private final ScanResultCache scanResultCache; protected final int caching; protected long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. @@ -159,6 +139,8 @@ public abstract class ClientScanner extends AbstractClientScanner { this.rpcControllerFactory = controllerFactory; this.conf = conf; + + this.scanResultCache = createScanResultCache(scan); initCache(); } @@ -356,14 +338,7 @@ public abstract class ClientScanner extends AbstractClientScanner { private void closeScannerIfExhausted(boolean exhausted) throws IOException { if (exhausted) { - if (!partialResults.isEmpty()) { - // XXX: continue if there are partial results. But in fact server should not set - // hasMoreResults to false if there are partial results. - LOG.warn("Server tells us there is no more results for this region but we still have" + - " partialResults, this should not happen, retry on the current scanner anyway"); - } else { - closeScanner(); - } + closeScanner(); } } @@ -371,7 +346,7 @@ public abstract class ClientScanner extends AbstractClientScanner { MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { // An exception was thrown which makes any partial results that we were collecting // invalid. The scanner will need to be reset to the beginning of a row. - clearPartialResults(); + scanResultCache.clear(); // Unfortunately, DNRIOE is used in two different semantics. // (1) The first is to close the client scanner and bubble up the exception all the way @@ -465,7 +440,7 @@ public abstract class ClientScanner extends AbstractClientScanner { if (callable.switchedToADifferentReplica()) { // Any accumulated partial results are no longer valid since the callable will // openScanner with the correct startkey and we must pick up from there - clearPartialResults(); + scanResultCache.clear(); this.currentRegion = callable.getHRegionInfo(); } retryAfterOutOfOrderException.setValue(true); @@ -485,29 +460,19 @@ public abstract class ClientScanner extends AbstractClientScanner { // Groom the array of Results that we received back from the server before adding that // Results to the scanner's cache. If partial results are not allowed to be seen by the // caller, all book keeping will be performed within this method. - List resultsToAddToCache = - getResultsToAddToCache(values, callable.isHeartbeatMessage()); - if (!resultsToAddToCache.isEmpty()) { + Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage()); + if (resultsToAddToCache.length > 0) { for (Result rs : resultsToAddToCache) { - rs = filterLoadedCell(rs); - if (rs == null) { - continue; - } - cache.add(rs); long estimatedHeapSizeOfResult = calcEstimatedSize(rs); countdown--; remainingResultSize -= estimatedHeapSizeOfResult; addEstimatedSize(estimatedHeapSizeOfResult); this.lastResult = rs; - if (this.lastResult.mayHaveMoreCellsInRow()) { - updateLastCellLoadedToCache(this.lastResult); - } else { - this.lastCellLoadedToCache = null; - } } - if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) { - int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache); + if (scan.getLimit() > 0) { + int newLimit = + scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache)); assert newLimit >= 0; scan.setLimit(newLimit); } @@ -550,13 +515,6 @@ public abstract class ClientScanner extends AbstractClientScanner { } // we are done with the current region if (regionExhausted) { - if (!partialResults.isEmpty()) { - // XXX: continue if there are partial results. But in fact server should not set - // hasMoreResults to false if there are partial results. - LOG.warn("Server tells us there is no more results for this region but we still have" + - " partialResults, this should not happen, retry on the current scanner anyway"); - continue; - } if (!moveToNextRegion()) { break; } @@ -573,142 +531,6 @@ public abstract class ClientScanner extends AbstractClientScanner { return cache != null ? cache.size() : 0; } - /** - * This method ensures all of our book keeping regarding partial results is kept up to date. This - * method should be called once we know that the results we received back from the RPC request do - * not contain errors. We return a list of results that should be added to the cache. In general, - * this list will contain all NON-partial results from the input array (unless the client has - * specified that they are okay with receiving partial results) - * @param resultsFromServer The array of {@link Result}s returned from the server - * @param heartbeatMessage Flag indicating whether or not the response received from the server - * represented a complete response, or a heartbeat message that was sent to keep the - * client-server connection alive - * @return the list of results that should be added to the cache. - * @throws IOException - */ - protected List getResultsToAddToCache(Result[] resultsFromServer, - boolean heartbeatMessage) throws IOException { - int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; - List resultsToAddToCache = new ArrayList<>(resultSize); - - // If the caller has indicated in their scan that they are okay with seeing partial results, - // then simply add all results to the list. Note allowPartial and setBatch are not same, we can - // return here if allow partials and we will handle batching later. - if (scan.getAllowPartialResults()) { - addResultsToList(resultsToAddToCache, resultsFromServer, 0, - (null == resultsFromServer ? 0 : resultsFromServer.length)); - return resultsToAddToCache; - } - - // If no results were returned it indicates that either we have the all the partial results - // necessary to construct the complete result or the server had to send a heartbeat message - // to the client to keep the client-server connection alive - if (resultsFromServer == null || resultsFromServer.length == 0) { - // If this response was an empty heartbeat message, then we have not exhausted the region - // and thus there may be more partials server side that still need to be added to the partial - // list before we form the complete Result - if (!partialResults.isEmpty() && !heartbeatMessage) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - - return resultsToAddToCache; - } - - for(Result result : resultsFromServer) { - if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) { - // We have a new row, complete the previous row. - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - Result res = regroupResults(result); - if (res != null) { - resultsToAddToCache.add(res); - } - if (!result.mayHaveMoreCellsInRow()) { - // We are done for this row - if (partialResultsCellSizes > 0) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - } - clearPartialResults(); - } - } - - - return resultsToAddToCache; - } - - /** - * Add new result to the partial list and return a batched Result if caching size exceed - * batching limit. - * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. - * setBatch doesn't mean setAllowPartialResult(true) - * @param result The result that we want to add to our list of partial Results - * @return the result if we have batch limit and there is one Result can be returned to user, or - * null if we have not. - * @throws IOException - */ - private Result regroupResults(final Result result) throws IOException { - partialResultsRow = result.getRow(); - partialResults.add(result); - partialResultsCellSizes += result.size(); - if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) { - Cell[] cells = new Cell[scan.getBatch()]; - int count = 0; - boolean stale = false; - while (count < scan.getBatch()) { - Result res = partialResults.poll(); - stale = stale || res.isStale(); - if (res.size() + count <= scan.getBatch()) { - System.arraycopy(res.rawCells(), 0, cells, count, res.size()); - count += res.size(); - } else { - int len = scan.getBatch() - count; - System.arraycopy(res.rawCells(), 0, cells, count, len); - Cell[] remainingCells = new Cell[res.size() - len]; - System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len); - Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(), - res.mayHaveMoreCellsInRow()); - partialResults.addFirst(remainingRes); - count = scan.getBatch(); - } - } - partialResultsCellSizes -= scan.getBatch(); - if (partialResultsCellSizes == 0) { - // We have nothing in partialResults, clear the flags to prevent returning empty Result - // when next result belongs to the next row. - clearPartialResults(); - } - return Result.create(cells, null, stale, - partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow()); - } - return null; - } - - /** - * Convenience method for clearing the list of partials and resetting the partialResultsRow. - */ - private void clearPartialResults() { - partialResults.clear(); - partialResultsCellSizes = 0; - partialResultsRow = null; - } - - /** - * Helper method for adding results between the indices [start, end) to the outputList - * @param outputList the list that results will be added to - * @param inputArray the array that results are taken from - * @param start beginning index (inclusive) - * @param end ending index (exclusive) - */ - private void addResultsToList(List outputList, Result[] inputArray, int start, int end) { - if (inputArray == null || start < 0 || end > inputArray.length) return; - - for (int i = start; i < end; i++) { - outputList.add(inputArray[i]); - } - } - @Override public void close() { if (!scanMetricsPublished) writeScanMetrics(); @@ -749,57 +571,6 @@ public abstract class ClientScanner extends AbstractClientScanner { return false; } - protected void updateLastCellLoadedToCache(Result result) { - if (result.rawCells().length == 0) { - return; - } - this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1]; - } - - /** - * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not - * columns. - */ - private int compare(Cell a, Cell b) { - CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() - ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR; - int r = comparator.compareRows(a, b); - if (r != 0) { - return this.scan.isReversed() ? -r : r; - } - return CellComparator.compareWithoutRow(a, b); - } - - private Result filterLoadedCell(Result result) { - // we only filter result when last result is partial - // so lastCellLoadedToCache and result should have same row key. - // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region; - // 3) read more cell. lastCellLoadedToCache and result will be not at same row. - if (lastCellLoadedToCache == null || result.rawCells().length == 0) { - return result; - } - if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) { - // The first cell of this result is larger than the last cell of loadcache. - // If user do not allow partial result, it must be true. - return result; - } - if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) { - // The last cell of this result is smaller than the last cell of loadcache, skip all. - return null; - } - - // The first one must not in filtered result, we start at the second. - int index = 1; - while (index < result.rawCells().length) { - if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) { - break; - } - index++; - } - Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); - return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow()); - } - protected void initCache() { initSyncCache(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 2b758362483..3e7cd00c38b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -316,6 +316,10 @@ public final class ConnectionUtils { } static Result filterCells(Result result, Cell keepCellsAfter) { + if (keepCellsAfter == null) { + // do not need to filter + return result; + } // not the same row if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { return result; @@ -410,4 +414,14 @@ public final class ConnectionUtils { public static int numberOfIndividualRows(List results) { return (int) results.stream().filter(r -> !r.mayHaveMoreCellsInRow()).count(); } + + public static ScanResultCache createScanResultCache(Scan scan) { + if (scan.getAllowPartialResults()) { + return new AllowPartialScanResultCache(); + } else if (scan.getBatch() > 0) { + return new BatchScanResultCache(scan.getBatch()); + } else { + return new CompleteScanResultCache(); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 4752d70108f..f8682ecfd58 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,7 +24,9 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -145,11 +147,11 @@ public class Result implements CellScannable, CellScanner { } public static Result create(List cells, Boolean exists, boolean stale, - boolean hasMoreCellsInRow) { + boolean mayHaveMoreCellsInRow) { if (exists != null){ - return new Result(null, exists, stale, hasMoreCellsInRow); + return new Result(null, exists, stale, mayHaveMoreCellsInRow); } - return new Result(cells.toArray(new Cell[cells.size()]), null, stale, hasMoreCellsInRow); + return new Result(cells.toArray(new Cell[cells.size()]), null, stale, mayHaveMoreCellsInRow); } /** @@ -792,44 +794,42 @@ public class Result implements CellScannable, CellScanner { * @throws IOException A complete result cannot be formed because the results in the partial list * come from different rows */ - public static Result createCompleteResult(List partialResults) + public static Result createCompleteResult(Iterable partialResults) throws IOException { + if (partialResults == null) { + return Result.create(Collections.emptyList(), null, false); + } List cells = new ArrayList<>(); boolean stale = false; byte[] prevRow = null; byte[] currentRow = null; - - if (partialResults != null && !partialResults.isEmpty()) { - for (int i = 0; i < partialResults.size(); i++) { - Result r = partialResults.get(i); - currentRow = r.getRow(); - if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { - throw new IOException( - "Cannot form complete result. Rows of partial results do not match." + - " Partial Results: " + partialResults); - } - - // Ensure that all Results except the last one are marked as partials. The last result - // may not be marked as a partial because Results are only marked as partials when - // the scan on the server side must be stopped due to reaching the maxResultSize. - // Visualizing it makes it easier to understand: - // maxResultSize: 2 cells - // (-x-) represents cell number x in a row - // Example: row1: -1- -2- -3- -4- -5- (5 cells total) - // How row1 will be returned by the server as partial Results: - // Result1: -1- -2- (2 cells, size limit reached, mark as partial) - // Result2: -3- -4- (2 cells, size limit reached, mark as partial) - // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) { - throw new IOException( - "Cannot form complete result. Result is missing partial flag. " + - "Partial Results: " + partialResults); - } - prevRow = currentRow; - stale = stale || r.isStale(); - for (Cell c : r.rawCells()) { - cells.add(c); - } + for (Iterator iter = partialResults.iterator(); iter.hasNext();) { + Result r = iter.next(); + currentRow = r.getRow(); + if (prevRow != null && !Bytes.equals(prevRow, currentRow)) { + throw new IOException( + "Cannot form complete result. Rows of partial results do not match." + + " Partial Results: " + partialResults); + } + // Ensure that all Results except the last one are marked as partials. The last result + // may not be marked as a partial because Results are only marked as partials when + // the scan on the server side must be stopped due to reaching the maxResultSize. + // Visualizing it makes it easier to understand: + // maxResultSize: 2 cells + // (-x-) represents cell number x in a row + // Example: row1: -1- -2- -3- -4- -5- (5 cells total) + // How row1 will be returned by the server as partial Results: + // Result1: -1- -2- (2 cells, size limit reached, mark as partial) + // Result2: -3- -4- (2 cells, size limit reached, mark as partial) + // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) + if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) { + throw new IOException("Cannot form complete result. Result is missing partial flag. " + + "Partial Results: " + partialResults); + } + prevRow = currentRow; + stale = stale || r.isStale(); + for (Cell c : r.rawCells()) { + cells.add(c); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java index fc5ba1499f5..3fe43a5a343 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; +import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import java.io.IOException; import java.util.Arrays; -import java.util.stream.IntStream; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -51,10 +51,6 @@ public class TestAllowPartialScanResultCache { resultCache = null; } - private static Cell createCell(int key, int cq) { - return new KeyValue(Bytes.toBytes(key), CF, Bytes.toBytes("cq" + cq), Bytes.toBytes(key)); - } - @Test public void test() throws IOException { assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, @@ -62,31 +58,34 @@ public class TestAllowPartialScanResultCache { assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); - Cell[] cells1 = IntStream.range(0, 10).mapToObj(i -> createCell(1, i)).toArray(Cell[]::new); - Cell[] cells2 = IntStream.range(0, 10).mapToObj(i -> createCell(2, i)).toArray(Cell[]::new); + Cell[] cells1 = createCells(CF, 1, 10); + Cell[] cells2 = createCells(CF, 2, 10); Result[] results1 = resultCache.addAndGet( new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false); assertEquals(1, results1.length); assertEquals(1, Bytes.toInt(results1[0].getRow())); assertEquals(5, results1[0].rawCells().length); - IntStream.range(0, 5).forEach( - i -> assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))))); + for (int i = 0; i < 5; i++) { + assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i)))); + } Result[] results2 = resultCache.addAndGet( new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false); assertEquals(1, results2.length); assertEquals(1, Bytes.toInt(results2[0].getRow())); assertEquals(5, results2[0].rawCells().length); - IntStream.range(5, 10).forEach( - i -> assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))))); + for (int i = 5; i < 10; i++) { + assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i)))); + } - Result[] results3 = resultCache - .addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false); + Result[] results3 = + resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false); assertEquals(1, results3.length); assertEquals(2, Bytes.toInt(results3[0].getRow())); assertEquals(10, results3[0].rawCells().length); - IntStream.range(0, 10).forEach( - i -> assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i))))); + for (int i = 0; i < 10; i++) { + assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i)))); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java new file mode 100644 index 00000000000..31a4594d355 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class, ClientTests.class }) +public class TestBatchScanResultCache { + + private static byte[] CF = Bytes.toBytes("cf"); + + private BatchScanResultCache resultCache; + + @Before + public void setUp() { + resultCache = new BatchScanResultCache(4); + } + + @After + public void tearDown() { + resultCache.clear(); + resultCache = null; + } + + static Cell createCell(byte[] cf, int key, int cq) { + return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), Bytes.toBytes(key)); + } + + static Cell[] createCells(byte[] cf, int key, int numCqs) { + Cell[] cells = new Cell[numCqs]; + for (int i = 0; i < numCqs; i++) { + cells[i] = createCell(cf, key, i); + } + return cells; + } + + private void assertResultEquals(Result result, int key, int start, int to) { + assertEquals(to - start, result.size()); + for (int i = start; i < to; i++) { + assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + i)))); + } + assertEquals(to - start == 4, result.mayHaveMoreCellsInRow()); + } + + @Test + public void test() throws IOException { + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false)); + assertSame(ScanResultCache.EMPTY_RESULT_ARRAY, + resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); + + Cell[] cells1 = createCells(CF, 1, 10); + Cell[] cells2 = createCells(CF, 2, 10); + Cell[] cells3 = createCells(CF, 3, 10); + assertEquals(0, resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length); + Result[] results = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true), + Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) }, + false); + assertEquals(2, results.length); + assertResultEquals(results[0], 1, 0, 4); + assertResultEquals(results[1], 1, 4, 8); + results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false); + assertEquals(1, results.length); + assertResultEquals(results[0], 1, 8, 10); + + results = resultCache.addAndGet( + new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true), + Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true), + Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true), + Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true), + Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true), + Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) }, + false); + assertEquals(6, results.length); + assertResultEquals(results[0], 2, 0, 4); + assertResultEquals(results[1], 2, 4, 8); + assertResultEquals(results[2], 2, 8, 10); + assertResultEquals(results[3], 3, 0, 4); + assertResultEquals(results[4], 3, 4, 8); + assertResultEquals(results[5], 3, 8, 10); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java index a340e9f901c..8759593a5f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertSame; import java.io.IOException; import java.util.Arrays; -import java.util.stream.IntStream; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; @@ -70,9 +69,9 @@ public class TestCompleteResultScanResultCache { resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true)); int count = 10; Result[] results = new Result[count]; - IntStream.range(0, count).forEach(i -> { + for (int i = 0; i < count; i++) { results[i] = Result.create(Arrays.asList(createCell(i, CQ1))); - }); + } assertSame(results, resultCache.addAndGet(results, false)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java new file mode 100644 index 00000000000..2a32206b11a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestRawAsyncTablePartialScan { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[][] CQS = + new byte[][] { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3") }; + + private static int COUNT = 100; + + private static AsyncConnection CONN; + + private static RawAsyncTable TABLE; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + TABLE = CONN.getRawTable(TABLE_NAME); + TABLE + .putAll(IntStream.range(0, COUNT) + .mapToObj(i -> new Put(Bytes.toBytes(String.format("%02d", i))) + .addColumn(FAMILY, CQS[0], Bytes.toBytes(i)) + .addColumn(FAMILY, CQS[1], Bytes.toBytes(2 * i)) + .addColumn(FAMILY, CQS[2], Bytes.toBytes(3 * i))) + .collect(Collectors.toList())) + .get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testBatchDoNotAllowPartial() throws InterruptedException, ExecutionException { + // we set batch to 2 and max result size to 1, then server will only returns one result per call + // but we should get 2 + 1 for every row. + List results = TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1)).get(); + assertEquals(2 * COUNT, results.size()); + for (int i = 0; i < COUNT; i++) { + Result firstTwo = results.get(2 * i); + assertEquals(String.format("%02d", i), Bytes.toString(firstTwo.getRow())); + assertEquals(2, firstTwo.size()); + assertEquals(i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0]))); + assertEquals(2 * i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1]))); + + Result secondOne = results.get(2 * i + 1); + assertEquals(String.format("%02d", i), Bytes.toString(secondOne.getRow())); + assertEquals(1, secondOne.size()); + assertEquals(3 * i, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2]))); + } + } + + @Test + public void testReversedBatchDoNotAllowPartial() throws InterruptedException, ExecutionException { + // we set batch to 2 and max result size to 1, then server will only returns one result per call + // but we should get 2 + 1 for every row. + List results = + TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1).setReversed(true)).get(); + assertEquals(2 * COUNT, results.size()); + for (int i = 0; i < COUNT; i++) { + int row = COUNT - i - 1; + Result firstTwo = results.get(2 * i); + assertEquals(String.format("%02d", row), Bytes.toString(firstTwo.getRow())); + assertEquals(2, firstTwo.size()); + assertEquals(row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0]))); + assertEquals(2 * row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1]))); + + Result secondOne = results.get(2 * i + 1); + assertEquals(String.format("%02d", row), Bytes.toString(secondOne.getRow())); + assertEquals(1, secondOne.size()); + assertEquals(3 * row, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2]))); + } + } +}