HBASE-15484 Correct the semantic of batch and partial

This commit is contained in:
Phil Yang 2017-03-06 15:26:47 +08:00
parent ccf1f17d78
commit 6afe696e53
9 changed files with 157 additions and 149 deletions

View File

@ -75,10 +75,11 @@ public abstract class ClientScanner extends AbstractClientScanner {
* result. * result.
*/ */
protected final LinkedList<Result> partialResults = new LinkedList<Result>(); protected final LinkedList<Result> partialResults = new LinkedList<Result>();
protected int partialResultsCellSizes = 0;
/** /**
* The row for which we are accumulating partial Results (i.e. the row of the Results stored * The row for which we are accumulating partial Results (i.e. the row of the Results stored
* inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
* the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()} * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()}
*/ */
protected byte[] partialResultsRow = null; protected byte[] partialResultsRow = null;
/** /**
@ -413,7 +414,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// If the lastRow is not partial, then we should start from the next row. As now we can // If the lastRow is not partial, then we should start from the next row. As now we can
// exclude the start row, the logic here is the same for both normal scan and reversed scan. // exclude the start row, the logic here is the same for both normal scan and reversed scan.
// If lastResult is partial then include it, otherwise exclude it. // If lastResult is partial then include it, otherwise exclude it.
scan.withStartRow(lastResult.getRow(), lastResult.hasMoreCellsInRow()); scan.withStartRow(lastResult.getRow(), lastResult.mayHaveMoreCellsInRow());
} }
if (e instanceof OutOfOrderScannerNextException) { if (e instanceof OutOfOrderScannerNextException) {
if (retryAfterOutOfOrderException.isTrue()) { if (retryAfterOutOfOrderException.isTrue()) {
@ -503,7 +504,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
} }
countdown--; countdown--;
this.lastResult = rs; this.lastResult = rs;
if (this.lastResult.hasMoreCellsInRow()) { if (this.lastResult.mayHaveMoreCellsInRow()) {
updateLastCellLoadedToCache(this.lastResult); updateLastCellLoadedToCache(this.lastResult);
} else { } else {
this.lastCellLoadedToCache = null; this.lastCellLoadedToCache = null;
@ -585,16 +586,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0; int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize); List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
final boolean isBatchSet = scan != null && scan.getBatch() > 0;
final boolean allowPartials = scan != null && scan.getAllowPartialResults();
// If the caller has indicated in their scan that they are okay with seeing partial results, // If the caller has indicated in their scan that they are okay with seeing partial results,
// then simply add all results to the list. Note that since scan batching also returns results // then simply add all results to the list. Note allowPartial and setBatch are not same, we can
// for a row in pieces we treat batch being set as equivalent to allowing partials. The // return here if allow partials and we will handle batching later.
// implication of treating batching as equivalent to partial results is that it is possible if (scan.getAllowPartialResults()) {
// the caller will receive a result back where the number of cells in the result is less than
// the batch size even though it may not be the last group of cells for that row.
if (allowPartials || isBatchSet) {
addResultsToList(resultsToAddToCache, resultsFromServer, 0, addResultsToList(resultsToAddToCache, resultsFromServer, 0,
(null == resultsFromServer ? 0 : resultsFromServer.length)); (null == resultsFromServer ? 0 : resultsFromServer.length));
return resultsToAddToCache; return resultsToAddToCache;
@ -615,100 +610,68 @@ public abstract class ClientScanner extends AbstractClientScanner {
return resultsToAddToCache; return resultsToAddToCache;
} }
// In every RPC response there should be at most a single partial result. Furthermore, if for(Result result : resultsFromServer) {
// there is a partial result, it is guaranteed to be in the last position of the array. if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) {
Result last = resultsFromServer[resultsFromServer.length - 1]; // We have a new row, complete the previous row.
Result partial = last.hasMoreCellsInRow() ? last : null; resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
if (LOG.isTraceEnabled()) { }
StringBuilder sb = new StringBuilder(); Result res = regroupResults(result);
sb.append("number results from RPC: ").append(resultsFromServer.length).append(","); if (res != null) {
sb.append("partial != null: ").append(partial != null).append(","); resultsToAddToCache.add(res);
sb.append("number of partials so far: ").append(partialResults.size()); }
LOG.trace(sb.toString()); if (!result.mayHaveMoreCellsInRow()) {
} // We are done for this row
if (partialResultsCellSizes > 0) {
// There are three possibilities cases that can occur while handling partial results resultsToAddToCache.add(Result.createCompleteResult(partialResults));
// }
// 1. (partial != null && partialResults.isEmpty()) clearPartialResults();
// This is the first partial result that we have received. It should be added to
// the list of partialResults and await the next RPC request at which point another
// portion of the complete result will be received
//
// 2. !partialResults.isEmpty()
// Since our partialResults list is not empty it means that we have been accumulating partial
// Results for a particular row. We cannot form the complete/whole Result for that row until
// all partials for the row have been received. Thus we loop through all of the Results
// returned from the server and determine whether or not all partial Results for the row have
// been received. We know that we have received all of the partial Results for the row when:
// i) We notice a row change in the Results
// ii) We see a Result for the partial row that is NOT marked as a partial Result
//
// 3. (partial == null && partialResults.isEmpty())
// Business as usual. We are not accumulating partial results and there wasn't a partial result
// in the RPC response. This means that all of the results we received from the server are
// complete and can be added directly to the cache
if (partial != null && partialResults.isEmpty()) {
addToPartialResults(partial);
// Exclude the last result, it's a partial
addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
} else if (!partialResults.isEmpty()) {
for (int i = 0; i < resultsFromServer.length; i++) {
Result result = resultsFromServer[i];
// This result is from the same row as the partial Results. Add it to the list of partials
// and check if it was the last partial Result for that row
if (Bytes.equals(partialResultsRow, result.getRow())) {
addToPartialResults(result);
// If the result is not a partial, it is a signal to us that it is the last Result we
// need to form the complete Result client-side
if (!result.hasMoreCellsInRow()) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
} else {
// The row of this result differs from the row of the partial results we have received so
// far. If our list of partials isn't empty, this is a signal to form the complete Result
// since the row has now changed
if (!partialResults.isEmpty()) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
// It's possible that in one response from the server we receive the final partial for
// one row and receive a partial for a different row. Thus, make sure that all Results
// are added to the proper list
if (result.hasMoreCellsInRow()) {
addToPartialResults(result);
} else {
resultsToAddToCache.add(result);
}
}
} }
} else { // partial == null && partialResults.isEmpty() -- business as usual
addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
} }
return resultsToAddToCache; return resultsToAddToCache;
} }
/** /**
* A convenience method for adding a Result to our list of partials. This method ensure that only * Add new result to the partial list and return a batched Result if caching size exceed
* Results that belong to the same row as the other partials can be added to the list. * batching limit.
* If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user.
* setBatch doesn't mean setAllowPartialResult(true)
* @param result The result that we want to add to our list of partial Results * @param result The result that we want to add to our list of partial Results
* @return the result if we have batch limit and there is one Result can be returned to user, or
* null if we have not.
* @throws IOException * @throws IOException
*/ */
private void addToPartialResults(final Result result) throws IOException { private Result regroupResults(final Result result) throws IOException {
final byte[] row = result.getRow(); partialResultsRow = result.getRow();
if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
throw new IOException("Partial result row does not match. All partial results must come " +
"from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " +
Bytes.toString(row));
}
partialResultsRow = row;
partialResults.add(result); partialResults.add(result);
partialResultsCellSizes += result.size();
if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
Cell[] cells = new Cell[scan.getBatch()];
int count = 0;
boolean stale = false;
while (count < scan.getBatch()) {
Result res = partialResults.poll();
stale = stale || res.isStale();
if (res.size() + count <= scan.getBatch()) {
System.arraycopy(res.rawCells(), 0, cells, count, res.size());
count += res.size();
} else {
int len = scan.getBatch() - count;
System.arraycopy(res.rawCells(), 0, cells, count, len);
Cell[] remainingCells = new Cell[res.size() - len];
System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len);
Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(),
res.mayHaveMoreCellsInRow());
partialResults.addFirst(remainingRes);
count = scan.getBatch();
}
}
partialResultsCellSizes -= scan.getBatch();
return Result.create(cells, null, stale,
partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
}
return null;
} }
/** /**
@ -716,6 +679,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
*/ */
private void clearPartialResults() { private void clearPartialResults() {
partialResults.clear(); partialResults.clear();
partialResultsCellSizes = 0;
partialResultsRow = null; partialResultsRow = null;
} }
@ -822,6 +786,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
index++; index++;
} }
Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
return Result.create(list, result.getExists(), result.isStale(), result.hasMoreCellsInRow()); return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow());
} }
} }

View File

@ -281,7 +281,7 @@ public class ConnectionUtils {
public static int numberOfIndividualRows(List<Result> results) { public static int numberOfIndividualRows(List<Result> results) {
int count = 0; int count = 0;
for (Result result : results) { for (Result result : results) {
if (!result.hasMoreCellsInRow()) { if (!result.mayHaveMoreCellsInRow()) {
count++; count++;
} }
} }

View File

@ -83,9 +83,9 @@ public class Result implements CellScannable, CellScanner {
private boolean stale = false; private boolean stale = false;
/** /**
* See {@link #hasMoreCellsInRow()}. * See {@link #mayHaveMoreCellsInRow()}.
*/ */
private boolean hasMoreCellsInRow = false; private boolean mayHaveMoreCellsInRow = false;
// We're not using java serialization. Transient here is just a marker to say // We're not using java serialization. Transient here is just a marker to say
// that this is where we cache row if we're ever asked for it. // that this is where we cache row if we're ever asked for it.
private transient byte [] row = null; private transient byte [] row = null;
@ -192,7 +192,7 @@ public class Result implements CellScannable, CellScanner {
this.cells = cells; this.cells = cells;
this.exists = exists; this.exists = exists;
this.stale = stale; this.stale = stale;
this.hasMoreCellsInRow = mayHaveMoreCellsInRow; this.mayHaveMoreCellsInRow = mayHaveMoreCellsInRow;
this.readonly = false; this.readonly = false;
} }
@ -885,7 +885,7 @@ public class Result implements CellScannable, CellScanner {
// Result1: -1- -2- (2 cells, size limit reached, mark as partial) // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
// Result2: -3- -4- (2 cells, size limit reached, mark as partial) // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
// Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial) // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
if (i != (partialResults.size() - 1) && !r.hasMoreCellsInRow()) { if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
throw new IOException( throw new IOException(
"Cannot form complete result. Result is missing partial flag. " + "Cannot form complete result. Result is missing partial flag. " +
"Partial Results: " + partialResults); "Partial Results: " + partialResults);
@ -972,13 +972,13 @@ public class Result implements CellScannable, CellScanner {
* for a row and should be combined with a result representing the remaining cells in that row to * for a row and should be combined with a result representing the remaining cells in that row to
* form a complete (non-partial) result. * form a complete (non-partial) result.
* @return Whether or not the result is a partial result * @return Whether or not the result is a partial result
* @deprecated the word 'partial' ambiguous, use {@link #hasMoreCellsInRow()} instead. * @deprecated the word 'partial' ambiguous, use {@link #mayHaveMoreCellsInRow()} instead.
* Deprecated since 1.4.0. * Deprecated since 1.4.0.
* @see #hasMoreCellsInRow() * @see #mayHaveMoreCellsInRow()
*/ */
@Deprecated @Deprecated
public boolean isPartial() { public boolean isPartial() {
return hasMoreCellsInRow; return mayHaveMoreCellsInRow;
} }
/** /**
@ -990,8 +990,8 @@ public class Result implements CellScannable, CellScanner {
* {@link Scan#setMaxResultSize(long)} and the default value can be seen here: * {@link Scan#setMaxResultSize(long)} and the default value can be seen here:
* {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE} * {@link HConstants#DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE}
*/ */
public boolean hasMoreCellsInRow() { public boolean mayHaveMoreCellsInRow() {
return hasMoreCellsInRow; return mayHaveMoreCellsInRow;
} }
/** /**

View File

@ -325,7 +325,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// 2. The last result was not a partial result which means it contained all of the cells for // 2. The last result was not a partial result which means it contained all of the cells for
// that row (we no longer need any information from it). Set the start row to the next // that row (we no longer need any information from it). Set the start row to the next
// closest row that could be seen. // closest row that could be seen.
callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.hasMoreCellsInRow()); callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.mayHaveMoreCellsInRow());
} }
@VisibleForTesting @VisibleForTesting

View File

@ -1438,7 +1438,7 @@ public final class ProtobufUtil {
} }
builder.setStale(result.isStale()); builder.setStale(result.isStale());
builder.setPartial(result.hasMoreCellsInRow()); builder.setPartial(result.mayHaveMoreCellsInRow());
return builder.build(); return builder.build();
} }

View File

@ -42,6 +42,8 @@ public interface KeyValueScanner {
/** /**
* Look at the next Cell in this scanner, but do not iterate scanner. * Look at the next Cell in this scanner, but do not iterate scanner.
* NOTICE: The returned cell has not been passed into ScanQueryMatcher. So it may not be what the
* user need.
* @return the next Cell * @return the next Cell
*/ */
Cell peek(); Cell peek();

View File

@ -412,7 +412,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (isClientCellBlockSupport()) { if (isClientCellBlockSupport()) {
for (Result res : results) { for (Result res : results) {
builder.addCellsPerResult(res.size()); builder.addCellsPerResult(res.size());
builder.addPartialFlagPerResult(res.hasMoreCellsInRow()); builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow());
} }
controller.setCellScanner(CellUtil.createCellScanner(results)); controller.setCellScanner(CellUtil.createCellScanner(results));
} else { } else {
@ -2853,7 +2853,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// is false. Can remove the isEmpty check after we get rid of the old implementation. // is false. Can remove the isEmpty check after we get rid of the old implementation.
moreResults = false; moreResults = false;
} else if (limitOfRows > 0 && !results.isEmpty() && } else if (limitOfRows > 0 && !results.isEmpty() &&
!results.get(results.size() - 1).hasMoreCellsInRow() && !results.get(results.size() - 1).mayHaveMoreCellsInRow() &&
ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) { ConnectionUtils.numberOfIndividualRows(results) >= limitOfRows) {
// if we have reached the limit of rows // if we have reached the limit of rows
moreResults = false; moreResults = false;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -152,7 +153,7 @@ public class TestPartialResultsFromClientSide {
message = "Ensuring the expected keyValues are present for row " + row; message = "Ensuring the expected keyValues are present for row " + row;
List<Cell> expectedKeyValues = createKeyValuesForRow(ROWS[row], FAMILIES, QUALIFIERS, VALUE); List<Cell> expectedKeyValues = createKeyValuesForRow(ROWS[row], FAMILIES, QUALIFIERS, VALUE);
Result result = partialScanner.next(); Result result = partialScanner.next();
assertFalse(result.hasMoreCellsInRow()); assertFalse(result.mayHaveMoreCellsInRow());
verifyResult(result, expectedKeyValues, message); verifyResult(result, expectedKeyValues, message);
} }
@ -172,7 +173,7 @@ public class TestPartialResultsFromClientSide {
Result result = scanner.next(); Result result = scanner.next();
assertTrue(result != null); assertTrue(result != null);
assertTrue(result.hasMoreCellsInRow()); assertTrue(result.mayHaveMoreCellsInRow());
assertTrue(result.rawCells() != null); assertTrue(result.rawCells() != null);
assertTrue(result.rawCells().length == 1); assertTrue(result.rawCells().length == 1);
@ -183,7 +184,7 @@ public class TestPartialResultsFromClientSide {
result = scanner.next(); result = scanner.next();
assertTrue(result != null); assertTrue(result != null);
assertTrue(!result.hasMoreCellsInRow()); assertTrue(!result.mayHaveMoreCellsInRow());
assertTrue(result.rawCells() != null); assertTrue(result.rawCells() != null);
assertTrue(result.rawCells().length == NUM_COLS); assertTrue(result.rawCells().length == NUM_COLS);
@ -277,7 +278,7 @@ public class TestPartialResultsFromClientSide {
for (Cell c : partialResult.rawCells()) { for (Cell c : partialResult.rawCells()) {
aggregatePartialCells.add(c); aggregatePartialCells.add(c);
} }
} while (partialResult.hasMoreCellsInRow()); } while (partialResult.mayHaveMoreCellsInRow());
assertTrue("Number of cells differs. iteration: " + iterationCount, assertTrue("Number of cells differs. iteration: " + iterationCount,
oneShotResult.rawCells().length == aggregatePartialCells.size()); oneShotResult.rawCells().length == aggregatePartialCells.size());
@ -347,7 +348,7 @@ public class TestPartialResultsFromClientSide {
// the last group of cells that fit inside the maxResultSize // the last group of cells that fit inside the maxResultSize
assertTrue( assertTrue(
"Result's cell count differed from expected number. result: " + result, "Result's cell count differed from expected number. result: " + result,
result.rawCells().length == expectedNumberOfCells || !result.hasMoreCellsInRow() result.rawCells().length == expectedNumberOfCells || !result.mayHaveMoreCellsInRow()
|| !Bytes.equals(prevRow, result.getRow())); || !Bytes.equals(prevRow, result.getRow()));
prevRow = result.getRow(); prevRow = result.getRow();
} }
@ -424,11 +425,11 @@ public class TestPartialResultsFromClientSide {
while ((result = scanner.next()) != null) { while ((result = scanner.next()) != null) {
assertTrue(result.rawCells() != null); assertTrue(result.rawCells() != null);
if (result.hasMoreCellsInRow()) { if (result.mayHaveMoreCellsInRow()) {
final String error = final String error =
"Cells:" + result.rawCells().length + " Batch size:" + batch "Cells:" + result.rawCells().length + " Batch size:" + batch
+ " cellsPerPartialResult:" + cellsPerPartialResult + " rep:" + repCount; + " cellsPerPartialResult:" + cellsPerPartialResult + " rep:" + repCount;
assertTrue(error, result.rawCells().length <= Math.min(batch, cellsPerPartialResult)); assertTrue(error, result.rawCells().length == batch);
} else { } else {
assertTrue(result.rawCells().length <= batch); assertTrue(result.rawCells().length <= batch);
} }
@ -470,7 +471,7 @@ public class TestPartialResultsFromClientSide {
do { do {
partialResult = partialScanner.next(); partialResult = partialScanner.next();
partials.add(partialResult); partials.add(partialResult);
} while (partialResult != null && partialResult.hasMoreCellsInRow()); } while (partialResult != null && partialResult.mayHaveMoreCellsInRow());
completeResult = Result.createCompleteResult(partials); completeResult = Result.createCompleteResult(partials);
oneShotResult = oneShotScanner.next(); oneShotResult = oneShotScanner.next();
@ -531,7 +532,7 @@ public class TestPartialResultsFromClientSide {
Result r = null; Result r = null;
while ((r = scanner.next()) != null) { while ((r = scanner.next()) != null) {
assertFalse(r.hasMoreCellsInRow()); assertFalse(r.mayHaveMoreCellsInRow());
} }
scanner.close(); scanner.close();
@ -581,7 +582,7 @@ public class TestPartialResultsFromClientSide {
// hit before the caching limit and thus partial results may be seen // hit before the caching limit and thus partial results may be seen
boolean expectToSeePartialResults = resultSizeRowLimit < cachingRowLimit; boolean expectToSeePartialResults = resultSizeRowLimit < cachingRowLimit;
while ((r = clientScanner.next()) != null) { while ((r = clientScanner.next()) != null) {
assertTrue(!r.hasMoreCellsInRow() || expectToSeePartialResults); assertTrue(!r.mayHaveMoreCellsInRow() || expectToSeePartialResults);
} }
scanner.close(); scanner.close();
@ -846,7 +847,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(1, result1.rawCells().length); assertEquals(1, result1.rawCells().length);
Cell c1 = result1.rawCells()[0]; Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
assertFalse(result1.hasMoreCellsInRow()); assertFalse(result1.mayHaveMoreCellsInRow());
moveRegion(table, 2); moveRegion(table, 2);
@ -854,7 +855,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(1, result2.rawCells().length); assertEquals(1, result2.rawCells().length);
Cell c2 = result2.rawCells()[0]; Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
assertTrue(result2.hasMoreCellsInRow()); assertTrue(result2.mayHaveMoreCellsInRow());
moveRegion(table, 3); moveRegion(table, 3);
@ -862,7 +863,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(1, result3.rawCells().length); assertEquals(1, result3.rawCells().length);
Cell c3 = result3.rawCells()[0]; Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]);
assertTrue(result3.hasMoreCellsInRow()); assertTrue(result3.mayHaveMoreCellsInRow());
} }
@ -885,7 +886,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(1, result1.rawCells().length); assertEquals(1, result1.rawCells().length);
Cell c1 = result1.rawCells()[0]; Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
assertFalse(result1.hasMoreCellsInRow()); assertFalse(result1.mayHaveMoreCellsInRow());
moveRegion(table, 2); moveRegion(table, 2);
@ -893,7 +894,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(1, result2.rawCells().length); assertEquals(1, result2.rawCells().length);
Cell c2 = result2.rawCells()[0]; Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
assertTrue(result2.hasMoreCellsInRow()); assertTrue(result2.mayHaveMoreCellsInRow());
moveRegion(table, 3); moveRegion(table, 3);
@ -901,7 +902,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(1, result3.rawCells().length); assertEquals(1, result3.rawCells().length);
Cell c3 = result3.rawCells()[0]; Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]); assertCell(c3, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[1]);
assertTrue(result3.hasMoreCellsInRow()); assertTrue(result3.mayHaveMoreCellsInRow());
} }
@ -921,7 +922,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length); assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result1.rawCells().length);
Cell c1 = result1.rawCells()[0]; Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]); assertCell(c1, ROWS[0], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result1.hasMoreCellsInRow()); assertFalse(result1.mayHaveMoreCellsInRow());
moveRegion(table, 2); moveRegion(table, 2);
@ -929,7 +930,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length); assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result2.rawCells().length);
Cell c2 = result2.rawCells()[0]; Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result2.hasMoreCellsInRow()); assertFalse(result2.mayHaveMoreCellsInRow());
moveRegion(table, 3); moveRegion(table, 3);
@ -937,7 +938,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length); assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, result3.rawCells().length);
Cell c3 = result3.rawCells()[0]; Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]); assertCell(c3, ROWS[2], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result3.hasMoreCellsInRow()); assertFalse(result3.mayHaveMoreCellsInRow());
} }
@ -958,7 +959,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length); assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result1.rawCells().length);
Cell c1 = result1.rawCells()[0]; Cell c1 = result1.rawCells()[0];
assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]); assertCell(c1, ROWS[NUM_ROWS-1], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result1.hasMoreCellsInRow()); assertFalse(result1.mayHaveMoreCellsInRow());
moveRegion(table, 2); moveRegion(table, 2);
@ -966,7 +967,7 @@ public class TestPartialResultsFromClientSide {
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length); assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result2.rawCells().length);
Cell c2 = result2.rawCells()[0]; Cell c2 = result2.rawCells()[0];
assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]); assertCell(c2, ROWS[NUM_ROWS-2], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result2.hasMoreCellsInRow()); assertFalse(result2.mayHaveMoreCellsInRow());
moveRegion(table, 3); moveRegion(table, 3);
@ -974,44 +975,85 @@ public class TestPartialResultsFromClientSide {
assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length); assertEquals(NUM_FAMILIES*NUM_QUALIFIERS, result3.rawCells().length);
Cell c3 = result3.rawCells()[0]; Cell c3 = result3.rawCells()[0];
assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]); assertCell(c3, ROWS[NUM_ROWS-3], FAMILIES[0], QUALIFIERS[0]);
assertFalse(result3.hasMoreCellsInRow()); assertFalse(result3.mayHaveMoreCellsInRow());
} }
@Test @Test
public void testBatchingResultWhenRegionMove() throws IOException { public void testBatchingResultWhenRegionMove() throws IOException {
Table table = // If user setBatch(5) and rpc returns 3+5+5+5+3 cells,
createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"), ROWS, FAMILIES, // we should return 5+5+5+5+1 to user.
QUALIFIERS, VALUE); // setBatch doesn't mean setAllowPartialResult(true)
Table table = createTestTable(TableName.valueOf("testBatchingResultWhenRegionMove"),
ROWS, FAMILIES, QUALIFIERS, VALUE);
Put put = new Put(ROWS[1]);
put.addColumn(FAMILIES[0], QUALIFIERS[1], new byte[VALUE_SIZE * 10]);
table.put(put);
Delete delete = new Delete(ROWS[1]);
delete.addColumn(FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]);
table.delete(delete);
moveRegion(table, 1); moveRegion(table, 1);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setCaching(1); scan.setCaching(1);
scan.setBatch(1); scan.setBatch(5);
scan.setMaxResultSize(VALUE_SIZE * 6);
ResultScanner scanner = table.getScanner(scan); ResultScanner scanner = table.getScanner(scan);
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS - 1; i++) { for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 1; i++) {
scanner.next(); assertTrue(scanner.next().mayHaveMoreCellsInRow());
} }
Result result1 = scanner.next(); Result result1 = scanner.next();
assertEquals(1, result1.rawCells().length); assertEquals(5, result1.rawCells().length);
Cell c1 = result1.rawCells()[0]; assertCell(result1.rawCells()[0], ROWS[0], FAMILIES[NUM_FAMILIES - 1],
assertCell(c1, ROWS[0], FAMILIES[NUM_FAMILIES - 1], QUALIFIERS[NUM_QUALIFIERS - 1]); QUALIFIERS[NUM_QUALIFIERS - 5]);
assertCell(result1.rawCells()[4], ROWS[0], FAMILIES[NUM_FAMILIES - 1],
QUALIFIERS[NUM_QUALIFIERS - 1]);
assertFalse(result1.mayHaveMoreCellsInRow());
moveRegion(table, 2); moveRegion(table, 2);
Result result2 = scanner.next(); Result result2 = scanner.next();
assertEquals(1, result2.rawCells().length); assertEquals(5, result2.rawCells().length);
Cell c2 = result2.rawCells()[0]; assertCell(result2.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[0]);
assertCell(c2, ROWS[1], FAMILIES[0], QUALIFIERS[0]); assertCell(result2.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[4]);
assertTrue(result2.mayHaveMoreCellsInRow());
moveRegion(table, 3); moveRegion(table, 3);
Result result3 = scanner.next(); Result result3 = scanner.next();
assertEquals(1, result3.rawCells().length); assertEquals(5, result3.rawCells().length);
Cell c3 = result3.rawCells()[0]; assertCell(result3.rawCells()[0], ROWS[1], FAMILIES[0], QUALIFIERS[5]);
assertCell(c3, ROWS[1], FAMILIES[0], QUALIFIERS[1]); assertCell(result3.rawCells()[4], ROWS[1], FAMILIES[0], QUALIFIERS[9]);
assertTrue(result3.mayHaveMoreCellsInRow());
for (int i = 0; i < NUM_FAMILIES * NUM_QUALIFIERS / 5 - 3; i++) {
Result result = scanner.next();
assertEquals(5, result.rawCells().length);
assertTrue(result.mayHaveMoreCellsInRow());
}
Result result = scanner.next();
assertEquals(4, result.rawCells().length);
assertFalse(result.mayHaveMoreCellsInRow());
for (int i = 2; i < NUM_ROWS; i++) {
for (int j = 0; j < NUM_FAMILIES; j++) {
for (int k = 0; k < NUM_QUALIFIERS; k += 5) {
result = scanner.next();
assertCell(result.rawCells()[0], ROWS[i], FAMILIES[j], QUALIFIERS[k]);
assertEquals(5, result.rawCells().length);
if (j == NUM_FAMILIES - 1 && k == NUM_QUALIFIERS - 5) {
assertFalse(result.mayHaveMoreCellsInRow());
} else {
assertTrue(result.mayHaveMoreCellsInRow());
}
}
}
}
assertNull(scanner.next());
} }
@Test @Test

View File

@ -136,14 +136,14 @@ public class TestScannersFromClientSide2 {
private List<Result> assertAndCreateCompleteResults(List<Result> results) throws IOException { private List<Result> assertAndCreateCompleteResults(List<Result> results) throws IOException {
if ((!batch && !allowPartial) || (allowPartial && !batch && !smallResultSize)) { if ((!batch && !allowPartial) || (allowPartial && !batch && !smallResultSize)) {
for (Result result : results) { for (Result result : results) {
assertFalse("Should not have partial result", result.hasMoreCellsInRow()); assertFalse("Should not have partial result", result.mayHaveMoreCellsInRow());
} }
return results; return results;
} }
List<Result> completeResults = new ArrayList<>(); List<Result> completeResults = new ArrayList<>();
List<Result> partialResults = new ArrayList<>(); List<Result> partialResults = new ArrayList<>();
for (Result result : results) { for (Result result : results) {
if (!result.hasMoreCellsInRow()) { if (!result.mayHaveMoreCellsInRow()) {
assertFalse("Should have partial result", partialResults.isEmpty()); assertFalse("Should have partial result", partialResults.isEmpty());
partialResults.add(result); partialResults.add(result);
completeResults.add(Result.createCompleteResult(partialResults)); completeResults.add(Result.createCompleteResult(partialResults));