From 0d3e986f7e8e5052448f08ac08f644bfaadf89e1 Mon Sep 17 00:00:00 2001 From: Phil Yang Date: Thu, 2 Mar 2017 16:43:17 +0800 Subject: [PATCH] HBASE-15484 Correct the semantic of batch and partial --- ...syncScanSingleRegionRpcRetryingCaller.java | 2 +- .../hadoop/hbase/client/ClientScanner.java | 157 +++++++----------- .../hbase/client/CompleteScanResultCache.java | 2 +- .../hadoop/hbase/client/ConnectionUtils.java | 4 +- .../apache/hadoop/hbase/client/Result.java | 25 +-- .../client/ScannerCallableWithReplicas.java | 2 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 2 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 9 +- .../procedure/MasterProcedureScheduler.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 1 - .../hbase/regionserver/KeyValueScanner.java | 2 + .../hbase/regionserver/RSRpcServices.java | 4 +- .../TestPartialResultsFromClientSide.java | 111 +++++++++---- .../client/TestScannersFromClientSide2.java | 4 +- 14 files changed, 165 insertions(+), 162 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index abcc26eeb8b..dd843ed547e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -404,7 +404,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void updateNextStartRowWhenError(Result result) { nextStartRowWhenError = result.getRow(); - includeNextStartRowWhenError = result.hasMoreCellsInRow(); + includeNextStartRowWhenError = result.mayHaveMoreCellsInRow(); } private void completeWhenNoMoreResultsInRegion() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 47270a72c40..b11a84131d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -76,10 +76,11 @@ public abstract class ClientScanner extends AbstractClientScanner { * result. */ protected final LinkedList partialResults = new LinkedList(); + protected int partialResultsCellSizes = 0; /** * The row for which we are accumulating partial Results (i.e. the row of the Results stored * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via - * the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} + * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()} */ protected byte[] partialResultsRow = null; /** @@ -406,7 +407,7 @@ public abstract class ClientScanner extends AbstractClientScanner { // If the lastRow is not partial, then we should start from the next row. As now we can // exclude the start row, the logic here is the same for both normal scan and reversed scan. // If lastResult is partial then include it, otherwise exclude it. - scan.withStartRow(lastResult.getRow(), lastResult.hasMoreCellsInRow()); + scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow()); } if (e instanceof OutOfOrderScannerNextException) { if (retryAfterOutOfOrderException.isTrue()) { @@ -497,7 +498,7 @@ public abstract class ClientScanner extends AbstractClientScanner { remainingResultSize -= estimatedHeapSizeOfResult; addEstimatedSize(estimatedHeapSizeOfResult); this.lastResult = rs; - if (this.lastResult.hasMoreCellsInRow()) { + if (this.lastResult.mayHaveMoreCellsInRow()) { updateLastCellLoadedToCache(this.lastResult); } else { this.lastCellLoadedToCache = null; @@ -588,16 +589,10 @@ public abstract class ClientScanner extends AbstractClientScanner { int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; List resultsToAddToCache = new ArrayList(resultSize); - final boolean isBatchSet = scan != null && scan.getBatch() > 0; - final boolean allowPartials = scan != null && scan.getAllowPartialResults(); - // If the caller has indicated in their scan that they are okay with seeing partial results, - // then simply add all results to the list. Note that since scan batching also returns results - // for a row in pieces we treat batch being set as equivalent to allowing partials. The - // implication of treating batching as equivalent to partial results is that it is possible - // the caller will receive a result back where the number of cells in the result is less than - // the batch size even though it may not be the last group of cells for that row. - if (allowPartials || isBatchSet) { + // then simply add all results to the list. Note allowPartial and setBatch are not same, we can + // return here if allow partials and we will handle batching later. + if (scan.getAllowPartialResults()) { addResultsToList(resultsToAddToCache, resultsFromServer, 0, (null == resultsFromServer ? 0 : resultsFromServer.length)); return resultsToAddToCache; @@ -618,100 +613,69 @@ public abstract class ClientScanner extends AbstractClientScanner { return resultsToAddToCache; } - // In every RPC response there should be at most a single partial result. Furthermore, if - // there is a partial result, it is guaranteed to be in the last position of the array. - Result last = resultsFromServer[resultsFromServer.length - 1]; - Result partial = last.hasMoreCellsInRow() ? last : null; - - if (LOG.isTraceEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append("number results from RPC: ").append(resultsFromServer.length).append(","); - sb.append("partial != null: ").append(partial != null).append(","); - sb.append("number of partials so far: ").append(partialResults.size()); - LOG.trace(sb.toString()); - } - - // There are three possibilities cases that can occur while handling partial results - // - // 1. (partial != null && partialResults.isEmpty()) - // This is the first partial result that we have received. It should be added to - // the list of partialResults and await the next RPC request at which point another - // portion of the complete result will be received - // - // 2. !partialResults.isEmpty() - // Since our partialResults list is not empty it means that we have been accumulating partial - // Results for a particular row. We cannot form the complete/whole Result for that row until - // all partials for the row have been received. Thus we loop through all of the Results - // returned from the server and determine whether or not all partial Results for the row have - // been received. We know that we have received all of the partial Results for the row when: - // i) We notice a row change in the Results - // ii) We see a Result for the partial row that is NOT marked as a partial Result - // - // 3. (partial == null && partialResults.isEmpty()) - // Business as usual. We are not accumulating partial results and there wasn't a partial result - // in the RPC response. This means that all of the results we received from the server are - // complete and can be added directly to the cache - if (partial != null && partialResults.isEmpty()) { - addToPartialResults(partial); - - // Exclude the last result, it's a partial - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1); - } else if (!partialResults.isEmpty()) { - for (int i = 0; i < resultsFromServer.length; i++) { - Result result = resultsFromServer[i]; - - // This result is from the same row as the partial Results. Add it to the list of partials - // and check if it was the last partial Result for that row - if (Bytes.equals(partialResultsRow, result.getRow())) { - addToPartialResults(result); - - // If the result is not a partial, it is a signal to us that it is the last Result we - // need to form the complete Result client-side - if (!result.hasMoreCellsInRow()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - } else { - // The row of this result differs from the row of the partial results we have received so - // far. If our list of partials isn't empty, this is a signal to form the complete Result - // since the row has now changed - if (!partialResults.isEmpty()) { - resultsToAddToCache.add(Result.createCompleteResult(partialResults)); - clearPartialResults(); - } - - // It's possible that in one response from the server we receive the final partial for - // one row and receive a partial for a different row. Thus, make sure that all Results - // are added to the proper list - if (result.hasMoreCellsInRow()) { - addToPartialResults(result); - } else { - resultsToAddToCache.add(result); - } - } + for(Result result : resultsFromServer) { + if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) { + // We have a new row, complete the previous row. + resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + clearPartialResults(); + } + Result res = regroupResults(result); + if (res != null) { + resultsToAddToCache.add(res); + } + if (!result.mayHaveMoreCellsInRow()) { + // We are done for this row + if (partialResultsCellSizes > 0) { + resultsToAddToCache.add(Result.createCompleteResult(partialResults)); + } + clearPartialResults(); } - } else { // partial == null && partialResults.isEmpty() -- business as usual - addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length); } + return resultsToAddToCache; } /** - * A convenience method for adding a Result to our list of partials. This method ensure that only - * Results that belong to the same row as the other partials can be added to the list. + * Add new result to the partial list and return a batched Result if caching size exceed + * batching limit. + * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. + * setBatch doesn't mean setAllowPartialResult(true) * @param result The result that we want to add to our list of partial Results + * @return the result if we have batch limit and there is one Result can be returned to user, or + * null if we have not. * @throws IOException */ - private void addToPartialResults(final Result result) throws IOException { - final byte[] row = result.getRow(); - if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) { - throw new IOException("Partial result row does not match. All partial results must come " + - "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " + - Bytes.toString(row)); - } - partialResultsRow = row; + private Result regroupResults(final Result result) throws IOException { + partialResultsRow = result.getRow(); partialResults.add(result); + partialResultsCellSizes += result.size(); + if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) { + Cell[] cells = new Cell[scan.getBatch()]; + int count = 0; + boolean stale = false; + while (count < scan.getBatch()) { + Result res = partialResults.poll(); + stale = stale || res.isStale(); + if (res.size() + count <= scan.getBatch()) { + System.arraycopy(res.rawCells(), 0, cells, count, res.size()); + count += res.size(); + } else { + int len = scan.getBatch() - count; + System.arraycopy(res.rawCells(), 0, cells, count, len); + Cell[] remainingCells = new Cell[res.size() - len]; + System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len); + Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(), + res.mayHaveMoreCellsInRow()); + partialResults.addFirst(remainingRes); + count = scan.getBatch(); + } + } + partialResultsCellSizes -= scan.getBatch(); + return Result.create(cells, null, stale, + partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow()); + } + return null; } /** @@ -719,6 +683,7 @@ public abstract class ClientScanner extends AbstractClientScanner { */ private void clearPartialResults() { partialResults.clear(); + partialResultsCellSizes = 0; partialResultsRow = null; } @@ -825,7 +790,7 @@ public abstract class ClientScanner extends AbstractClientScanner { index++; } Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); - return Result.create(list, result.getExists(), result.isStale(), result.hasMoreCellsInRow()); + return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow()); } protected void initCache() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java index bc79e041b98..e09ddfb7058 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompleteScanResultCache.java @@ -76,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache { // In every RPC response there should be at most a single partial result. Furthermore, if // there is a partial result, it is guaranteed to be in the last position of the array. Result last = results[results.length - 1]; - if (last.hasMoreCellsInRow()) { + if (last.mayHaveMoreCellsInRow()) { if (partialResults.isEmpty()) { partialResults.add(last); return Arrays.copyOf(results, results.length - 1); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 28f0cee4c0f..e010e9aed93 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -322,7 +322,7 @@ public final class ConnectionUtils { return null; } return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, - result.isStale(), result.hasMoreCellsInRow()); + result.isStale(), result.mayHaveMoreCellsInRow()); } // Add a delta to avoid timeout immediately after a retry sleeping. @@ -396,6 +396,6 @@ public final class ConnectionUtils { * */ public static int numberOfIndividualRows(List results) { - return (int) results.stream().filter(r -> !r.hasMoreCellsInRow()).count(); + return (int) results.stream().filter(r -> !r.mayHaveMoreCellsInRow()).count(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 232e3d3b136..0fadbd9e3e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -84,9 +84,9 @@ public class Result implements CellScannable, CellScanner { private boolean stale = false; /** - * See {@link #hasMoreCellsInRow()}. + * See {@link #mayHaveMoreCellsInRow()}. */ - private boolean hasMoreCellsInRow = false; + private boolean mayHaveMoreCellsInRow = false; // We're not using java serialization. Transient here is just a marker to say // that this is where we cache row if we're ever asked for it. private transient byte [] row = null; @@ -144,11 +144,12 @@ public class Result implements CellScannable, CellScanner { return create(cells, exists, stale, false); } - public static Result create(List cells, Boolean exists, boolean stale, boolean partial) { + public static Result create(List cells, Boolean exists, boolean stale, + boolean hasMoreCellsInRow) { if (exists != null){ - return new Result(null, exists, stale, partial); + return new Result(null, exists, stale, hasMoreCellsInRow); } - return new Result(cells.toArray(new Cell[cells.size()]), null, stale, partial); + return new Result(cells.toArray(new Cell[cells.size()]), null, stale, hasMoreCellsInRow); } /** @@ -177,7 +178,7 @@ public class Result implements CellScannable, CellScanner { this.cells = cells; this.exists = exists; this.stale = stale; - this.hasMoreCellsInRow = mayHaveMoreCellsInRow; + this.mayHaveMoreCellsInRow = mayHaveMoreCellsInRow; this.readonly = false; } @@ -822,7 +823,7 @@ public class Result implements CellScannable, CellScanner { // Result1: -1- -2- (2 cells, size limit reached, mark as partial) // Result2: -3- -4- (2 cells, size limit reached, mark as partial) // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) - if (i != (partialResults.size() - 1) && !r.hasMoreCellsInRow()) { + if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) { throw new IOException( "Cannot form complete result. Result is missing partial flag. " + "Partial Results: " + partialResults); @@ -909,13 +910,13 @@ public class Result implements CellScannable, CellScanner { * for a row and should be combined with a result representing the remaining cells in that row to * form a complete (non-partial) result. * @return Whether or not the result is a partial result - * @deprecated the word 'partial' ambiguous, use {@link #hasMoreCellsInRow()} instead. + * @deprecated the word 'partial' ambiguous, use {@link #mayHaveMoreCellsInRow()} instead. * Deprecated since 1.4.0. - * @see #hasMoreCellsInRow() + * @see #mayHaveMoreCellsInRow() */ @Deprecated public boolean isPartial() { - return hasMoreCellsInRow; + return mayHaveMoreCellsInRow; } /** @@ -927,8 +928,8 @@ public class Result implements CellScannable, CellScanner { * {@link Scan#setMaxResultSize(long)} and the default value can be seen here: * {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE} */ - public boolean hasMoreCellsInRow() { - return hasMoreCellsInRow; + public boolean mayHaveMoreCellsInRow() { + return mayHaveMoreCellsInRow; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 100075382e3..101e8daf552 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -324,7 +324,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { // 2. The last result was not a partial result which means it contained all of the cells for // that row (we no longer need any information from it). Set the start row to the next // closest row that could be seen. - callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.hasMoreCellsInRow()); + callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow()); } @VisibleForTesting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 52ee8a56b30..d935a08a533 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -1329,7 +1329,7 @@ public final class ProtobufUtil { } builder.setStale(result.isStale()); - builder.setPartial(result.hasMoreCellsInRow()); + builder.setPartial(result.mayHaveMoreCellsInRow()); return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 271a0def2c9..24302be9eff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -20,11 +20,9 @@ package org.apache.hadoop.hbase.shaded.protobuf; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -74,7 +72,6 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLoadStats; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -92,8 +89,6 @@ import org.apache.hadoop.hbase.quotas.QuotaType; import org.apache.hadoop.hbase.quotas.ThrottleType; import org.apache.hadoop.hbase.replication.ReplicationLoadSink; import org.apache.hadoop.hbase.replication.ReplicationLoadSource; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.visibility.Authorizations; import org.apache.hadoop.hbase.security.visibility.CellVisibility; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; @@ -159,7 +154,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDe import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -171,7 +165,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DynamicClassLoader; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.hbase.util.VersionInfo; @@ -1443,7 +1436,7 @@ public final class ProtobufUtil { } builder.setStale(result.isStale()); - builder.setPartial(result.hasMoreCellsInRow()); + builder.setPartial(result.mayHaveMoreCellsInRow()); return builder.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 508f7c4e135..ebf79fa0b1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -239,7 +239,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - private void clearQueue() { + protected void clearQueue() { // Remove Servers for (int i = 0; i < serverBuckets.length; ++i) { clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index cc32179e952..f2bc068e097 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6183,7 +6183,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Ok, we are good, let's try to get some results from the main heap. populateResult(results, this.storeHeap, scannerContext, current); - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { if (hasFilterRow) { throw new IncompatibleFilterException( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index 44b081bf8aa..a4cb2f4da74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -42,6 +42,8 @@ public interface KeyValueScanner extends Shipper, Closeable { /** * Look at the next Cell in this scanner, but do not iterate scanner. + * NOTICE: The returned cell has not been passed into ScanQueryMatcher. So it may not be what the + * user need. * @return the next Cell */ Cell peek(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e6c2a49d92e..e6e43a4df9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -486,7 +486,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (clientCellBlockSupported) { for (Result res : results) { builder.addCellsPerResult(res.size()); - builder.addPartialFlagPerResult(res.hasMoreCellsInRow()); + builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); } controller.setCellScanner(CellUtil.createCellScanner(results)); } else { @@ -3070,7 +3070,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // is false. Can remove the isEmpty check after we get rid of the old implementation. moreResults = false; } else if (limitOfRows > 0 && !results.isEmpty() && - !results.get(results.size() - 1).hasMoreCellsInRow() && + !results.get(results.size() - 1).mayHaveMoreCellsInRow() && ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) { // if we have reached the limit of rows moreResults = false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java index 61e34677d77..aeb30c29fd6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPartialResultsFromClientSide.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -158,7 +159,7 @@ public class TestPartialResultsFromClientSide { message = "Ensuring the expected keyValues are present for row " + row; List expectedKeyValues = createKeyValuesForRow(ROWS[row], FAMILIES, QUALIFIERS, VALUE); Result result = partialScanner.next(); - assertFalse(result.hasMoreCellsInRow()); + assertFalse(result.mayHaveMoreCellsInRow()); verifyResult(result, expectedKeyValues, message); } @@ -178,7 +179,7 @@ public class TestPartialResultsFromClientSide { Result result = scanner.next(); assertTrue(result != null); - assertTrue(result.hasMoreCellsInRow()); + assertTrue(result.mayHaveMoreCellsInRow()); assertTrue(result.rawCells() != null); assertTrue(result.rawCells().length == 1); @@ -189,7 +190,7 @@ public class TestPartialResultsFromClientSide { result = scanner.next(); assertTrue(result != null); - assertTrue(!result.hasMoreCellsInRow()); + assertTrue(!result.mayHaveMoreCellsInRow()); assertTrue(result.rawCells() != null); assertTrue(result.rawCells().length == NUM_COLS); @@ -283,7 +284,7 @@ public class TestPartialResultsFromClientSide { for (Cell c : partialResult.rawCells()) { aggregatePartialCells.add(c); } - } while (partialResult.hasMoreCellsInRow()); + } while (partialResult.mayHaveMoreCellsInRow()); assertTrue("Number of cells differs. iteration: " + iterationCount, oneShotResult.rawCells().length == aggregatePartialCells.size()); @@ -353,7 +354,7 @@ public class TestPartialResultsFromClientSide { // the last group of cells that fit inside the maxResultSize assertTrue( "Result's cell count differed from expected number. result: " + result, - result.rawCells().length == expectedNumberOfCells || !result.hasMoreCellsInRow() + result.rawCells().length == expectedNumberOfCells || !result.mayHaveMoreCellsInRow() || !Bytes.equals(prevRow, result.getRow())); prevRow = result.getRow(); } @@ -431,11 +432,11 @@ public class TestPartialResultsFromClientSide { while ((result = scanner.next()) != null) { assertTrue(result.rawCells() != null); - if (result.hasMoreCellsInRow()) { + if (result.mayHaveMoreCellsInRow()) { final String error = "Cells:" + result.rawCells().length + " Batch size:" + batch + " cellsPerPartialResult:" + cellsPerPartialResult + " rep:" + repCount; - assertTrue(error, result.rawCells().length <= Math.min(batch, cellsPerPartialResult)); + assertTrue(error, result.rawCells().length == batch); } else { assertTrue(result.rawCells().length <= batch); } @@ -477,7 +478,7 @@ public class TestPartialResultsFromClientSide { do { partialResult = partialScanner.next(); partials.add(partialResult); - } while (partialResult != null && partialResult.hasMoreCellsInRow()); + } while (partialResult != null && partialResult.mayHaveMoreCellsInRow()); completeResult = Result.createCompleteResult(partials); oneShotResult = oneShotScanner.next(); @@ -538,7 +539,7 @@ public class TestPartialResultsFromClientSide { Result r = null; while ((r = scanner.next()) != null) { - assertFalse(r.hasMoreCellsInRow()); + assertFalse(r.mayHaveMoreCellsInRow()); } scanner.close(); @@ -588,7 +589,7 @@ public class TestPartialResultsFromClientSide { // hit before the caching limit and thus partial results may be seen boolean expectToSeePartialResults = resultSizeRowLimit < cachingRowLimit; while ((r = clientScanner.next()) != null) { - assertTrue(!r.hasMoreCellsInRow() || expectToSeePartialResults); + assertTrue(!r.mayHaveMoreCellsInRow() || expectToSeePartialResults); } scanner.close(); @@ -853,7 +854,7 @@ public class TestPartialResultsFromClientSide { assertEquals(1, result1.rawCells().length); Cell c1 = result1.rawCells()[0]; assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); - assertFalse(result1.hasMoreCellsInRow()); + assertFalse(result1.mayHaveMoreCellsInRow()); moveRegion(table, 2); @@ -861,7 +862,7 @@ public class TestPartialResultsFromClientSide { assertEquals(1, result2.rawCells().length); Cell c2 = result2.rawCells()[0]; assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); - assertTrue(result2.hasMoreCellsInRow()); + assertTrue(result2.mayHaveMoreCellsInRow()); moveRegion(table, 3); @@ -869,7 +870,7 @@ public class TestPartialResultsFromClientSide { assertEquals(1, result3.rawCells().length); Cell c3 = result3.rawCells()[0]; assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); - assertTrue(result3.hasMoreCellsInRow()); + assertTrue(result3.mayHaveMoreCellsInRow()); } @@ -892,7 +893,7 @@ public class TestPartialResultsFromClientSide { assertEquals(1, result1.rawCells().length); Cell c1 = result1.rawCells()[0]; assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); - assertFalse(result1.hasMoreCellsInRow()); + assertFalse(result1.mayHaveMoreCellsInRow()); moveRegion(table, 2); @@ -900,7 +901,7 @@ public class TestPartialResultsFromClientSide { assertEquals(1, result2.rawCells().length); Cell c2 = result2.rawCells()[0]; assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); - assertTrue(result2.hasMoreCellsInRow()); + assertTrue(result2.mayHaveMoreCellsInRow()); moveRegion(table, 3); @@ -908,7 +909,7 @@ public class TestPartialResultsFromClientSide { assertEquals(1, result3.rawCells().length); Cell c3 = result3.rawCells()[0]; assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]); - assertTrue(result3.hasMoreCellsInRow()); + assertTrue(result3.mayHaveMoreCellsInRow()); } @@ -928,7 +929,7 @@ public class TestPartialResultsFromClientSide { assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length); Cell c1 = result1.rawCells()[0]; assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]); - assertFalse(result1.hasMoreCellsInRow()); + assertFalse(result1.mayHaveMoreCellsInRow()); moveRegion(table, 2); @@ -936,7 +937,7 @@ public class TestPartialResultsFromClientSide { assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length); Cell c2 = result2.rawCells()[0]; assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); - assertFalse(result2.hasMoreCellsInRow()); + assertFalse(result2.mayHaveMoreCellsInRow()); moveRegion(table, 3); @@ -944,7 +945,7 @@ public class TestPartialResultsFromClientSide { assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length); Cell c3 = result3.rawCells()[0]; assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]); - assertFalse(result3.hasMoreCellsInRow()); + assertFalse(result3.mayHaveMoreCellsInRow()); } @@ -965,7 +966,7 @@ public class TestPartialResultsFromClientSide { assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length); Cell c1 = result1.rawCells()[0]; assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]); - assertFalse(result1.hasMoreCellsInRow()); + assertFalse(result1.mayHaveMoreCellsInRow()); moveRegion(table, 2); @@ -973,7 +974,7 @@ public class TestPartialResultsFromClientSide { assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length); Cell c2 = result2.rawCells()[0]; assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); - assertFalse(result2.hasMoreCellsInRow()); + assertFalse(result2.mayHaveMoreCellsInRow()); moveRegion(table, 3); @@ -981,43 +982,85 @@ public class TestPartialResultsFromClientSide { assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length); Cell c3 = result3.rawCells()[0]; assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]); - assertFalse(result3.hasMoreCellsInRow()); + assertFalse(result3.mayHaveMoreCellsInRow()); } @Test public void testBatchingResultWhenRegionMove() throws IOException { + // If user setBatch(5) and rpc returns 3+5+5+5+3 cells, + // we should return 5+5+5+5+1 to user. + // setBatch doesn't mean setAllowPartialResult(true) Table table = createTestTable(TableName.valueOf(name.getMethodName()), ROWS, FAMILIES, QUALIFIERS, VALUE); + Put put = new Put(ROWS[1]); + put.addColumn(FAMILIES[0], QUALIFIERS[1], new byte[VALUE_SIZE * 10]); + table.put(put); + Delete delete = new Delete(ROWS[1]); + delete.addColumn(FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + table.delete(delete); + moveRegion(table, 1); Scan scan = new Scan(); scan.setCaching(1); - scan.setBatch(1); + scan.setBatch(5); + scan.setMaxResultSize(VALUE_SIZE * 6); ResultScanner scanner = table.getScanner(scan); - for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { - scanner.next(); + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 1; i++) { + assertTrue(scanner.next().mayHaveMoreCellsInRow()); } Result result1 = scanner.next(); - assertEquals(1, result1.rawCells().length); - Cell c1 = result1.rawCells()[0]; - assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); + assertEquals(5, result1.rawCells().length); + assertCell(result1.rawCells()[0], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 5]); + assertCell(result1.rawCells()[4], ROWS[0], FAMILIES[NUM_FAMILIES - 1], + QUALIFIERS[NUM_QUALIFIERS - 1]); + assertFalse(result1.mayHaveMoreCellsInRow()); moveRegion(table, 2); Result result2 = scanner.next(); - assertEquals(1, result2.rawCells().length); - Cell c2 = result2.rawCells()[0]; - assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertEquals(5, result2.rawCells().length); + assertCell(result2.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[0]); + assertCell(result2.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[4]); + assertTrue(result2.mayHaveMoreCellsInRow()); moveRegion(table, 3); Result result3 = scanner.next(); - assertEquals(1, result3.rawCells().length); - Cell c3 = result3.rawCells()[0]; - assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); + assertEquals(5, result3.rawCells().length); + assertCell(result3.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[5]); + assertCell(result3.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[9]); + assertTrue(result3.mayHaveMoreCellsInRow()); + + for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 3; i++) { + Result result = scanner.next(); + assertEquals(5, result.rawCells().length); + assertTrue(result.mayHaveMoreCellsInRow()); + } + Result result = scanner.next(); + assertEquals(4, result.rawCells().length); + assertFalse(result.mayHaveMoreCellsInRow()); + + + for (int i = 2; i < NUM_ROWS; i++) { + for (int j = 0; j < NUM_FAMILIES; j++) { + for (int k = 0; k < NUM_QUALIFIERS; k += 5) { + result = scanner.next(); + assertCell(result.rawCells()[0], ROWS[i], FAMILIES[j], QUALIFIERS[k]); + assertEquals(5, result.rawCells().length); + if (j == NUM_FAMILIES - 1 && k == NUM_QUALIFIERS - 5) { + assertFalse(result.mayHaveMoreCellsInRow()); + } else { + assertTrue(result.mayHaveMoreCellsInRow()); + } + } + } + } + assertNull(scanner.next()); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java index 728a8f92778..4da94f25677 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java @@ -136,14 +136,14 @@ public class TestScannersFromClientSide2 { private List assertAndCreateCompleteResults(List results) throws IOException { if ((!batch && !allowPartial) || (allowPartial && !batch && !smallResultSize)) { for (Result result : results) { - assertFalse("Should not have partial result", result.hasMoreCellsInRow()); + assertFalse("Should not have partial result", result.mayHaveMoreCellsInRow()); } return results; } List completeResults = new ArrayList<>(); List partialResults = new ArrayList<>(); for (Result result : results) { - if (!result.hasMoreCellsInRow()) { + if (!result.mayHaveMoreCellsInRow()) { assertFalse("Should have partial result", partialResults.isEmpty()); partialResults.add(result); completeResults.add(Result.createCompleteResult(partialResults));