HBASE-15484 Correct the semantic of batch and partial
This commit is contained in:
parent
81cb298014
commit
0d3e986f7e
|
@ -404,7 +404,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
|
|||
|
||||
private void updateNextStartRowWhenError(Result result) {
|
||||
nextStartRowWhenError = result.getRow();
|
||||
includeNextStartRowWhenError = result.hasMoreCellsInRow();
|
||||
includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
|
||||
}
|
||||
|
||||
private void completeWhenNoMoreResultsInRegion() {
|
||||
|
|
|
@ -76,10 +76,11 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
* result.
|
||||
*/
|
||||
protected final LinkedList<Result> partialResults = new LinkedList<Result>();
|
||||
protected int partialResultsCellSizes = 0;
|
||||
/**
|
||||
* The row for which we are accumulating partial Results (i.e. the row of the Results stored
|
||||
* inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
|
||||
* the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
|
||||
* the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()}
|
||||
*/
|
||||
protected byte[] partialResultsRow = null;
|
||||
/**
|
||||
|
@ -406,7 +407,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
// If the lastRow is not partial, then we should start from the next row. As now we can
|
||||
// exclude the start row, the logic here is the same for both normal scan and reversed scan.
|
||||
// If lastResult is partial then include it, otherwise exclude it.
|
||||
scan.withStartRow(lastResult.getRow(), lastResult.hasMoreCellsInRow());
|
||||
scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow());
|
||||
}
|
||||
if (e instanceof OutOfOrderScannerNextException) {
|
||||
if (retryAfterOutOfOrderException.isTrue()) {
|
||||
|
@ -497,7 +498,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
remainingResultSize -= estimatedHeapSizeOfResult;
|
||||
addEstimatedSize(estimatedHeapSizeOfResult);
|
||||
this.lastResult = rs;
|
||||
if (this.lastResult.hasMoreCellsInRow()) {
|
||||
if (this.lastResult.mayHaveMoreCellsInRow()) {
|
||||
updateLastCellLoadedToCache(this.lastResult);
|
||||
} else {
|
||||
this.lastCellLoadedToCache = null;
|
||||
|
@ -588,16 +589,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
|
||||
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
|
||||
|
||||
final boolean isBatchSet = scan != null && scan.getBatch() > 0;
|
||||
final boolean allowPartials = scan != null && scan.getAllowPartialResults();
|
||||
|
||||
// If the caller has indicated in their scan that they are okay with seeing partial results,
|
||||
// then simply add all results to the list. Note that since scan batching also returns results
|
||||
// for a row in pieces we treat batch being set as equivalent to allowing partials. The
|
||||
// implication of treating batching as equivalent to partial results is that it is possible
|
||||
// the caller will receive a result back where the number of cells in the result is less than
|
||||
// the batch size even though it may not be the last group of cells for that row.
|
||||
if (allowPartials || isBatchSet) {
|
||||
// then simply add all results to the list. Note allowPartial and setBatch are not same, we can
|
||||
// return here if allow partials and we will handle batching later.
|
||||
if (scan.getAllowPartialResults()) {
|
||||
addResultsToList(resultsToAddToCache, resultsFromServer, 0,
|
||||
(null == resultsFromServer ? 0 : resultsFromServer.length));
|
||||
return resultsToAddToCache;
|
||||
|
@ -618,100 +613,69 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
return resultsToAddToCache;
|
||||
}
|
||||
|
||||
// In every RPC response there should be at most a single partial result. Furthermore, if
|
||||
// there is a partial result, it is guaranteed to be in the last position of the array.
|
||||
Result last = resultsFromServer[resultsFromServer.length - 1];
|
||||
Result partial = last.hasMoreCellsInRow() ? last : null;
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
|
||||
sb.append("partial != null: ").append(partial != null).append(",");
|
||||
sb.append("number of partials so far: ").append(partialResults.size());
|
||||
LOG.trace(sb.toString());
|
||||
}
|
||||
|
||||
// There are three possibilities cases that can occur while handling partial results
|
||||
//
|
||||
// 1. (partial != null && partialResults.isEmpty())
|
||||
// This is the first partial result that we have received. It should be added to
|
||||
// the list of partialResults and await the next RPC request at which point another
|
||||
// portion of the complete result will be received
|
||||
//
|
||||
// 2. !partialResults.isEmpty()
|
||||
// Since our partialResults list is not empty it means that we have been accumulating partial
|
||||
// Results for a particular row. We cannot form the complete/whole Result for that row until
|
||||
// all partials for the row have been received. Thus we loop through all of the Results
|
||||
// returned from the server and determine whether or not all partial Results for the row have
|
||||
// been received. We know that we have received all of the partial Results for the row when:
|
||||
// i) We notice a row change in the Results
|
||||
// ii) We see a Result for the partial row that is NOT marked as a partial Result
|
||||
//
|
||||
// 3. (partial == null && partialResults.isEmpty())
|
||||
// Business as usual. We are not accumulating partial results and there wasn't a partial result
|
||||
// in the RPC response. This means that all of the results we received from the server are
|
||||
// complete and can be added directly to the cache
|
||||
if (partial != null && partialResults.isEmpty()) {
|
||||
addToPartialResults(partial);
|
||||
|
||||
// Exclude the last result, it's a partial
|
||||
addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
|
||||
} else if (!partialResults.isEmpty()) {
|
||||
for (int i = 0; i < resultsFromServer.length; i++) {
|
||||
Result result = resultsFromServer[i];
|
||||
|
||||
// This result is from the same row as the partial Results. Add it to the list of partials
|
||||
// and check if it was the last partial Result for that row
|
||||
if (Bytes.equals(partialResultsRow, result.getRow())) {
|
||||
addToPartialResults(result);
|
||||
|
||||
// If the result is not a partial, it is a signal to us that it is the last Result we
|
||||
// need to form the complete Result client-side
|
||||
if (!result.hasMoreCellsInRow()) {
|
||||
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
|
||||
clearPartialResults();
|
||||
}
|
||||
} else {
|
||||
// The row of this result differs from the row of the partial results we have received so
|
||||
// far. If our list of partials isn't empty, this is a signal to form the complete Result
|
||||
// since the row has now changed
|
||||
if (!partialResults.isEmpty()) {
|
||||
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
|
||||
clearPartialResults();
|
||||
}
|
||||
|
||||
// It's possible that in one response from the server we receive the final partial for
|
||||
// one row and receive a partial for a different row. Thus, make sure that all Results
|
||||
// are added to the proper list
|
||||
if (result.hasMoreCellsInRow()) {
|
||||
addToPartialResults(result);
|
||||
} else {
|
||||
resultsToAddToCache.add(result);
|
||||
}
|
||||
}
|
||||
for(Result result : resultsFromServer) {
|
||||
if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) {
|
||||
// We have a new row, complete the previous row.
|
||||
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
|
||||
clearPartialResults();
|
||||
}
|
||||
Result res = regroupResults(result);
|
||||
if (res != null) {
|
||||
resultsToAddToCache.add(res);
|
||||
}
|
||||
if (!result.mayHaveMoreCellsInRow()) {
|
||||
// We are done for this row
|
||||
if (partialResultsCellSizes > 0) {
|
||||
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
|
||||
}
|
||||
clearPartialResults();
|
||||
}
|
||||
} else { // partial == null && partialResults.isEmpty() -- business as usual
|
||||
addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
|
||||
}
|
||||
|
||||
|
||||
return resultsToAddToCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* A convenience method for adding a Result to our list of partials. This method ensure that only
|
||||
* Results that belong to the same row as the other partials can be added to the list.
|
||||
* Add new result to the partial list and return a batched Result if caching size exceed
|
||||
* batching limit.
|
||||
* If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user.
|
||||
* setBatch doesn't mean setAllowPartialResult(true)
|
||||
* @param result The result that we want to add to our list of partial Results
|
||||
* @return the result if we have batch limit and there is one Result can be returned to user, or
|
||||
* null if we have not.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void addToPartialResults(final Result result) throws IOException {
|
||||
final byte[] row = result.getRow();
|
||||
if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
|
||||
throw new IOException("Partial result row does not match. All partial results must come " +
|
||||
"from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " +
|
||||
Bytes.toString(row));
|
||||
}
|
||||
partialResultsRow = row;
|
||||
private Result regroupResults(final Result result) throws IOException {
|
||||
partialResultsRow = result.getRow();
|
||||
partialResults.add(result);
|
||||
partialResultsCellSizes += result.size();
|
||||
if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
|
||||
Cell[] cells = new Cell[scan.getBatch()];
|
||||
int count = 0;
|
||||
boolean stale = false;
|
||||
while (count < scan.getBatch()) {
|
||||
Result res = partialResults.poll();
|
||||
stale = stale || res.isStale();
|
||||
if (res.size() + count <= scan.getBatch()) {
|
||||
System.arraycopy(res.rawCells(), 0, cells, count, res.size());
|
||||
count += res.size();
|
||||
} else {
|
||||
int len = scan.getBatch() - count;
|
||||
System.arraycopy(res.rawCells(), 0, cells, count, len);
|
||||
Cell[] remainingCells = new Cell[res.size() - len];
|
||||
System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len);
|
||||
Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(),
|
||||
res.mayHaveMoreCellsInRow());
|
||||
partialResults.addFirst(remainingRes);
|
||||
count = scan.getBatch();
|
||||
}
|
||||
}
|
||||
partialResultsCellSizes -= scan.getBatch();
|
||||
return Result.create(cells, null, stale,
|
||||
partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -719,6 +683,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
*/
|
||||
private void clearPartialResults() {
|
||||
partialResults.clear();
|
||||
partialResultsCellSizes = 0;
|
||||
partialResultsRow = null;
|
||||
}
|
||||
|
||||
|
@ -825,7 +790,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||
index++;
|
||||
}
|
||||
Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
|
||||
return Result.create(list, result.getExists(), result.isStale(), result.hasMoreCellsInRow());
|
||||
return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow());
|
||||
}
|
||||
|
||||
protected void initCache() {
|
||||
|
|
|
@ -76,7 +76,7 @@ class CompleteScanResultCache implements ScanResultCache {
|
|||
// In every RPC response there should be at most a single partial result. Furthermore, if
|
||||
// there is a partial result, it is guaranteed to be in the last position of the array.
|
||||
Result last = results[results.length - 1];
|
||||
if (last.hasMoreCellsInRow()) {
|
||||
if (last.mayHaveMoreCellsInRow()) {
|
||||
if (partialResults.isEmpty()) {
|
||||
partialResults.add(last);
|
||||
return Arrays.copyOf(results, results.length - 1);
|
||||
|
|
|
@ -322,7 +322,7 @@ public final class ConnectionUtils {
|
|||
return null;
|
||||
}
|
||||
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
|
||||
result.isStale(), result.hasMoreCellsInRow());
|
||||
result.isStale(), result.mayHaveMoreCellsInRow());
|
||||
}
|
||||
|
||||
// Add a delta to avoid timeout immediately after a retry sleeping.
|
||||
|
@ -396,6 +396,6 @@ public final class ConnectionUtils {
|
|||
* </ol>
|
||||
*/
|
||||
public static int numberOfIndividualRows(List<Result> results) {
|
||||
return (int) results.stream().filter(r -> !r.hasMoreCellsInRow()).count();
|
||||
return (int) results.stream().filter(r -> !r.mayHaveMoreCellsInRow()).count();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,9 +84,9 @@ public class Result implements CellScannable, CellScanner {
|
|||
private boolean stale = false;
|
||||
|
||||
/**
|
||||
* See {@link #hasMoreCellsInRow()}.
|
||||
* See {@link #mayHaveMoreCellsInRow()}.
|
||||
*/
|
||||
private boolean hasMoreCellsInRow = false;
|
||||
private boolean mayHaveMoreCellsInRow = false;
|
||||
// We're not using java serialization. Transient here is just a marker to say
|
||||
// that this is where we cache row if we're ever asked for it.
|
||||
private transient byte [] row = null;
|
||||
|
@ -144,11 +144,12 @@ public class Result implements CellScannable, CellScanner {
|
|||
return create(cells, exists, stale, false);
|
||||
}
|
||||
|
||||
public static Result create(List<Cell> cells, Boolean exists, boolean stale, boolean partial) {
|
||||
public static Result create(List<Cell> cells, Boolean exists, boolean stale,
|
||||
boolean hasMoreCellsInRow) {
|
||||
if (exists != null){
|
||||
return new Result(null, exists, stale, partial);
|
||||
return new Result(null, exists, stale, hasMoreCellsInRow);
|
||||
}
|
||||
return new Result(cells.toArray(new Cell[cells.size()]), null, stale, partial);
|
||||
return new Result(cells.toArray(new Cell[cells.size()]), null, stale, hasMoreCellsInRow);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -177,7 +178,7 @@ public class Result implements CellScannable, CellScanner {
|
|||
this.cells = cells;
|
||||
this.exists = exists;
|
||||
this.stale = stale;
|
||||
this.hasMoreCellsInRow = mayHaveMoreCellsInRow;
|
||||
this.mayHaveMoreCellsInRow = mayHaveMoreCellsInRow;
|
||||
this.readonly = false;
|
||||
}
|
||||
|
||||
|
@ -822,7 +823,7 @@ public class Result implements CellScannable, CellScanner {
|
|||
// Result1: -1- -2- (2 cells, size limit reached, mark as partial)
|
||||
// Result2: -3- -4- (2 cells, size limit reached, mark as partial)
|
||||
// Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
|
||||
if (i != (partialResults.size() - 1) && !r.hasMoreCellsInRow()) {
|
||||
if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
|
||||
throw new IOException(
|
||||
"Cannot form complete result. Result is missing partial flag. " +
|
||||
"Partial Results: " + partialResults);
|
||||
|
@ -909,13 +910,13 @@ public class Result implements CellScannable, CellScanner {
|
|||
* for a row and should be combined with a result representing the remaining cells in that row to
|
||||
* form a complete (non-partial) result.
|
||||
* @return Whether or not the result is a partial result
|
||||
* @deprecated the word 'partial' ambiguous, use {@link #hasMoreCellsInRow()} instead.
|
||||
* @deprecated the word 'partial' ambiguous, use {@link #mayHaveMoreCellsInRow()} instead.
|
||||
* Deprecated since 1.4.0.
|
||||
* @see #hasMoreCellsInRow()
|
||||
* @see #mayHaveMoreCellsInRow()
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isPartial() {
|
||||
return hasMoreCellsInRow;
|
||||
return mayHaveMoreCellsInRow;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -927,8 +928,8 @@ public class Result implements CellScannable, CellScanner {
|
|||
* {@link Scan#setMaxResultSize(long)} and the default value can be seen here:
|
||||
* {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE}
|
||||
*/
|
||||
public boolean hasMoreCellsInRow() {
|
||||
return hasMoreCellsInRow;
|
||||
public boolean mayHaveMoreCellsInRow() {
|
||||
return mayHaveMoreCellsInRow;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -324,7 +324,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
// 2. The last result was not a partial result which means it contained all of the cells for
|
||||
// that row (we no longer need any information from it). Set the start row to the next
|
||||
// closest row that could be seen.
|
||||
callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.hasMoreCellsInRow());
|
||||
callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -1329,7 +1329,7 @@ public final class ProtobufUtil {
|
|||
}
|
||||
|
||||
builder.setStale(result.isStale());
|
||||
builder.setPartial(result.hasMoreCellsInRow());
|
||||
builder.setPartial(result.mayHaveMoreCellsInRow());
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -20,11 +20,9 @@ package org.apache.hadoop.hbase.shaded.protobuf;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -74,7 +72,6 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.RegionLoadStats;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.client.SnapshotType;
|
||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||
|
@ -92,8 +89,6 @@ import org.apache.hadoop.hbase.quotas.QuotaType;
|
|||
import org.apache.hadoop.hbase.quotas.ThrottleType;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationLoadSink;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.visibility.Authorizations;
|
||||
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||
|
@ -159,7 +154,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDe
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
|
@ -171,7 +165,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
|
|||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.DynamicClassLoader;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||
import org.apache.hadoop.hbase.util.Methods;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
|
@ -1443,7 +1436,7 @@ public final class ProtobufUtil {
|
|||
}
|
||||
|
||||
builder.setStale(result.isStale());
|
||||
builder.setPartial(result.hasMoreCellsInRow());
|
||||
builder.setPartial(result.mayHaveMoreCellsInRow());
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -239,7 +239,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private void clearQueue() {
|
||||
protected void clearQueue() {
|
||||
// Remove Servers
|
||||
for (int i = 0; i < serverBuckets.length; ++i) {
|
||||
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
|
||||
|
|
|
@ -6183,7 +6183,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
// Ok, we are good, let's try to get some results from the main heap.
|
||||
populateResult(results, this.storeHeap, scannerContext, current);
|
||||
|
||||
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
|
||||
if (hasFilterRow) {
|
||||
throw new IncompatibleFilterException(
|
||||
|
|
|
@ -42,6 +42,8 @@ public interface KeyValueScanner extends Shipper, Closeable {
|
|||
|
||||
/**
|
||||
* Look at the next Cell in this scanner, but do not iterate scanner.
|
||||
* NOTICE: The returned cell has not been passed into ScanQueryMatcher. So it may not be what the
|
||||
* user need.
|
||||
* @return the next Cell
|
||||
*/
|
||||
Cell peek();
|
||||
|
|
|
@ -486,7 +486,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (clientCellBlockSupported) {
|
||||
for (Result res : results) {
|
||||
builder.addCellsPerResult(res.size());
|
||||
builder.addPartialFlagPerResult(res.hasMoreCellsInRow());
|
||||
builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
|
||||
}
|
||||
controller.setCellScanner(CellUtil.createCellScanner(results));
|
||||
} else {
|
||||
|
@ -3070,7 +3070,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// is false. Can remove the isEmpty check after we get rid of the old implementation.
|
||||
moreResults = false;
|
||||
} else if (limitOfRows > 0 && !results.isEmpty() &&
|
||||
!results.get(results.size() - 1).hasMoreCellsInRow() &&
|
||||
!results.get(results.size() - 1).mayHaveMoreCellsInRow() &&
|
||||
ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) {
|
||||
// if we have reached the limit of rows
|
||||
moreResults = false;
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
|
|||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -158,7 +159,7 @@ public class TestPartialResultsFromClientSide {
|
|||
message = "Ensuring the expected keyValues are present for row " + row;
|
||||
List<Cell> expectedKeyValues = createKeyValuesForRow(ROWS[row], FAMILIES, QUALIFIERS, VALUE);
|
||||
Result result = partialScanner.next();
|
||||
assertFalse(result.hasMoreCellsInRow());
|
||||
assertFalse(result.mayHaveMoreCellsInRow());
|
||||
verifyResult(result, expectedKeyValues, message);
|
||||
}
|
||||
|
||||
|
@ -178,7 +179,7 @@ public class TestPartialResultsFromClientSide {
|
|||
Result result = scanner.next();
|
||||
|
||||
assertTrue(result != null);
|
||||
assertTrue(result.hasMoreCellsInRow());
|
||||
assertTrue(result.mayHaveMoreCellsInRow());
|
||||
assertTrue(result.rawCells() != null);
|
||||
assertTrue(result.rawCells().length == 1);
|
||||
|
||||
|
@ -189,7 +190,7 @@ public class TestPartialResultsFromClientSide {
|
|||
result = scanner.next();
|
||||
|
||||
assertTrue(result != null);
|
||||
assertTrue(!result.hasMoreCellsInRow());
|
||||
assertTrue(!result.mayHaveMoreCellsInRow());
|
||||
assertTrue(result.rawCells() != null);
|
||||
assertTrue(result.rawCells().length == NUM_COLS);
|
||||
|
||||
|
@ -283,7 +284,7 @@ public class TestPartialResultsFromClientSide {
|
|||
for (Cell c : partialResult.rawCells()) {
|
||||
aggregatePartialCells.add(c);
|
||||
}
|
||||
} while (partialResult.hasMoreCellsInRow());
|
||||
} while (partialResult.mayHaveMoreCellsInRow());
|
||||
|
||||
assertTrue("Number of cells differs. iteration: " + iterationCount,
|
||||
oneShotResult.rawCells().length == aggregatePartialCells.size());
|
||||
|
@ -353,7 +354,7 @@ public class TestPartialResultsFromClientSide {
|
|||
// the last group of cells that fit inside the maxResultSize
|
||||
assertTrue(
|
||||
"Result's cell count differed from expected number. result: " + result,
|
||||
result.rawCells().length == expectedNumberOfCells || !result.hasMoreCellsInRow()
|
||||
result.rawCells().length == expectedNumberOfCells || !result.mayHaveMoreCellsInRow()
|
||||
|| !Bytes.equals(prevRow, result.getRow()));
|
||||
prevRow = result.getRow();
|
||||
}
|
||||
|
@ -431,11 +432,11 @@ public class TestPartialResultsFromClientSide {
|
|||
while ((result = scanner.next()) != null) {
|
||||
assertTrue(result.rawCells() != null);
|
||||
|
||||
if (result.hasMoreCellsInRow()) {
|
||||
if (result.mayHaveMoreCellsInRow()) {
|
||||
final String error =
|
||||
"Cells:" + result.rawCells().length + " Batch size:" + batch
|
||||
+ " cellsPerPartialResult:" + cellsPerPartialResult + " rep:" + repCount;
|
||||
assertTrue(error, result.rawCells().length <= Math.min(batch, cellsPerPartialResult));
|
||||
assertTrue(error, result.rawCells().length == batch);
|
||||
} else {
|
||||
assertTrue(result.rawCells().length <= batch);
|
||||
}
|
||||
|
@ -477,7 +478,7 @@ public class TestPartialResultsFromClientSide {
|
|||
do {
|
||||
partialResult = partialScanner.next();
|
||||
partials.add(partialResult);
|
||||
} while (partialResult != null && partialResult.hasMoreCellsInRow());
|
||||
} while (partialResult != null && partialResult.mayHaveMoreCellsInRow());
|
||||
|
||||
completeResult = Result.createCompleteResult(partials);
|
||||
oneShotResult = oneShotScanner.next();
|
||||
|
@ -538,7 +539,7 @@ public class TestPartialResultsFromClientSide {
|
|||
|
||||
Result r = null;
|
||||
while ((r = scanner.next()) != null) {
|
||||
assertFalse(r.hasMoreCellsInRow());
|
||||
assertFalse(r.mayHaveMoreCellsInRow());
|
||||
}
|
||||
|
||||
scanner.close();
|
||||
|
@ -588,7 +589,7 @@ public class TestPartialResultsFromClientSide {
|
|||
// hit before the caching limit and thus partial results may be seen
|
||||
boolean expectToSeePartialResults = resultSizeRowLimit < cachingRowLimit;
|
||||
while ((r = clientScanner.next()) != null) {
|
||||
assertTrue(!r.hasMoreCellsInRow() || expectToSeePartialResults);
|
||||
assertTrue(!r.mayHaveMoreCellsInRow() || expectToSeePartialResults);
|
||||
}
|
||||
|
||||
scanner.close();
|
||||
|
@ -853,7 +854,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(1, result1.rawCells().length);
|
||||
Cell c1 = result1.rawCells()[0];
|
||||
assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
|
||||
assertFalse(result1.hasMoreCellsInRow());
|
||||
assertFalse(result1.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 2);
|
||||
|
||||
|
@ -861,7 +862,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(1, result2.rawCells().length);
|
||||
Cell c2 = result2.rawCells()[0];
|
||||
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertTrue(result2.hasMoreCellsInRow());
|
||||
assertTrue(result2.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 3);
|
||||
|
||||
|
@ -869,7 +870,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(1, result3.rawCells().length);
|
||||
Cell c3 = result3.rawCells()[0];
|
||||
assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
|
||||
assertTrue(result3.hasMoreCellsInRow());
|
||||
assertTrue(result3.mayHaveMoreCellsInRow());
|
||||
|
||||
}
|
||||
|
||||
|
@ -892,7 +893,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(1, result1.rawCells().length);
|
||||
Cell c1 = result1.rawCells()[0];
|
||||
assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
|
||||
assertFalse(result1.hasMoreCellsInRow());
|
||||
assertFalse(result1.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 2);
|
||||
|
||||
|
@ -900,7 +901,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(1, result2.rawCells().length);
|
||||
Cell c2 = result2.rawCells()[0];
|
||||
assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertTrue(result2.hasMoreCellsInRow());
|
||||
assertTrue(result2.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 3);
|
||||
|
||||
|
@ -908,7 +909,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(1, result3.rawCells().length);
|
||||
Cell c3 = result3.rawCells()[0];
|
||||
assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]);
|
||||
assertTrue(result3.hasMoreCellsInRow());
|
||||
assertTrue(result3.mayHaveMoreCellsInRow());
|
||||
|
||||
}
|
||||
|
||||
|
@ -928,7 +929,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length);
|
||||
Cell c1 = result1.rawCells()[0];
|
||||
assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertFalse(result1.hasMoreCellsInRow());
|
||||
assertFalse(result1.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 2);
|
||||
|
||||
|
@ -936,7 +937,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length);
|
||||
Cell c2 = result2.rawCells()[0];
|
||||
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertFalse(result2.hasMoreCellsInRow());
|
||||
assertFalse(result2.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 3);
|
||||
|
||||
|
@ -944,7 +945,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length);
|
||||
Cell c3 = result3.rawCells()[0];
|
||||
assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertFalse(result3.hasMoreCellsInRow());
|
||||
assertFalse(result3.mayHaveMoreCellsInRow());
|
||||
|
||||
}
|
||||
|
||||
|
@ -965,7 +966,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length);
|
||||
Cell c1 = result1.rawCells()[0];
|
||||
assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertFalse(result1.hasMoreCellsInRow());
|
||||
assertFalse(result1.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 2);
|
||||
|
||||
|
@ -973,7 +974,7 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length);
|
||||
Cell c2 = result2.rawCells()[0];
|
||||
assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertFalse(result2.hasMoreCellsInRow());
|
||||
assertFalse(result2.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 3);
|
||||
|
||||
|
@ -981,43 +982,85 @@ public class TestPartialResultsFromClientSide {
|
|||
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length);
|
||||
Cell c3 = result3.rawCells()[0];
|
||||
assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertFalse(result3.hasMoreCellsInRow());
|
||||
assertFalse(result3.mayHaveMoreCellsInRow());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchingResultWhenRegionMove() throws IOException {
|
||||
// If user setBatch(5) and rpc returns 3+5+5+5+3 cells,
|
||||
// we should return 5+5+5+5+1 to user.
|
||||
// setBatch doesn't mean setAllowPartialResult(true)
|
||||
Table table = createTestTable(TableName.valueOf(name.getMethodName()), ROWS, FAMILIES,
|
||||
QUALIFIERS, VALUE);
|
||||
|
||||
Put put = new Put(ROWS[1]);
|
||||
put.addColumn(FAMILIES[0], QUALIFIERS[1], new byte[VALUE_SIZE * 10]);
|
||||
table.put(put);
|
||||
Delete delete = new Delete(ROWS[1]);
|
||||
delete.addColumn(FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
|
||||
table.delete(delete);
|
||||
|
||||
moveRegion(table, 1);
|
||||
|
||||
Scan scan = new Scan();
|
||||
scan.setCaching(1);
|
||||
scan.setBatch(1);
|
||||
scan.setBatch(5);
|
||||
scan.setMaxResultSize(VALUE_SIZE * 6);
|
||||
|
||||
ResultScanner scanner = table.getScanner(scan);
|
||||
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) {
|
||||
scanner.next();
|
||||
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 1; i++) {
|
||||
assertTrue(scanner.next().mayHaveMoreCellsInRow());
|
||||
}
|
||||
Result result1 = scanner.next();
|
||||
assertEquals(1, result1.rawCells().length);
|
||||
Cell c1 = result1.rawCells()[0];
|
||||
assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
|
||||
assertEquals(5, result1.rawCells().length);
|
||||
assertCell(result1.rawCells()[0], ROWS[0], FAMILIES[NUM_FAMILIES - 1],
|
||||
QUALIFIERS[NUM_QUALIFIERS - 5]);
|
||||
assertCell(result1.rawCells()[4], ROWS[0], FAMILIES[NUM_FAMILIES - 1],
|
||||
QUALIFIERS[NUM_QUALIFIERS - 1]);
|
||||
assertFalse(result1.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 2);
|
||||
|
||||
Result result2 = scanner.next();
|
||||
assertEquals(1, result2.rawCells().length);
|
||||
Cell c2 = result2.rawCells()[0];
|
||||
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertEquals(5, result2.rawCells().length);
|
||||
assertCell(result2.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[0]);
|
||||
assertCell(result2.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[4]);
|
||||
assertTrue(result2.mayHaveMoreCellsInRow());
|
||||
|
||||
moveRegion(table, 3);
|
||||
|
||||
Result result3 = scanner.next();
|
||||
assertEquals(1, result3.rawCells().length);
|
||||
Cell c3 = result3.rawCells()[0];
|
||||
assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
|
||||
assertEquals(5, result3.rawCells().length);
|
||||
assertCell(result3.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[5]);
|
||||
assertCell(result3.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[9]);
|
||||
assertTrue(result3.mayHaveMoreCellsInRow());
|
||||
|
||||
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 3; i++) {
|
||||
Result result = scanner.next();
|
||||
assertEquals(5, result.rawCells().length);
|
||||
assertTrue(result.mayHaveMoreCellsInRow());
|
||||
}
|
||||
Result result = scanner.next();
|
||||
assertEquals(4, result.rawCells().length);
|
||||
assertFalse(result.mayHaveMoreCellsInRow());
|
||||
|
||||
|
||||
for (int i = 2; i < NUM_ROWS; i++) {
|
||||
for (int j = 0; j < NUM_FAMILIES; j++) {
|
||||
for (int k = 0; k < NUM_QUALIFIERS; k += 5) {
|
||||
result = scanner.next();
|
||||
assertCell(result.rawCells()[0], ROWS[i], FAMILIES[j], QUALIFIERS[k]);
|
||||
assertEquals(5, result.rawCells().length);
|
||||
if (j == NUM_FAMILIES - 1 && k == NUM_QUALIFIERS - 5) {
|
||||
assertFalse(result.mayHaveMoreCellsInRow());
|
||||
} else {
|
||||
assertTrue(result.mayHaveMoreCellsInRow());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assertNull(scanner.next());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -136,14 +136,14 @@ public class TestScannersFromClientSide2 {
|
|||
private List<Result> assertAndCreateCompleteResults(List<Result> results) throws IOException {
|
||||
if ((!batch && !allowPartial) || (allowPartial && !batch && !smallResultSize)) {
|
||||
for (Result result : results) {
|
||||
assertFalse("Should not have partial result", result.hasMoreCellsInRow());
|
||||
assertFalse("Should not have partial result", result.mayHaveMoreCellsInRow());
|
||||
}
|
||||
return results;
|
||||
}
|
||||
List<Result> completeResults = new ArrayList<>();
|
||||
List<Result> partialResults = new ArrayList<>();
|
||||
for (Result result : results) {
|
||||
if (!result.hasMoreCellsInRow()) {
|
||||
if (!result.mayHaveMoreCellsInRow()) {
|
||||
assertFalse("Should have partial result", partialResults.isEmpty());
|
||||
partialResults.add(result);
|
||||
completeResults.add(Result.createCompleteResult(partialResults));
|
||||
|
|
Loading…
Reference in New Issue