HBASE-17793 Backport ScanResultCache related code to branch-1

This commit is contained in:
zhangduo 2017-03-16 15:25:56 +08:00
parent d542b446b8
commit 6be8d2041b
10 changed files with 858 additions and 277 deletions

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A ScanResultCache that may return partial result.
* <p>
* As we can only scan from the starting of a row when error, so here we also implement the logic
* that skips the cells that have already been returned.
*/
@InterfaceAudience.Private
class AllowPartialScanResultCache implements ScanResultCache {
// used to filter out the cells that already returned to user as we always start from the
// beginning of a row when retry.
private Cell lastCell;
private void updateLastCell(Result result) {
lastCell = result.rawCells()[result.rawCells().length - 1];
}
@Override
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
if (results.length == 0) {
return EMPTY_RESULT_ARRAY;
}
int i;
for (i = 0; i < results.length; i++) {
Result r = filterCells(results[i], lastCell);
if (r != null) {
results[i] = r;
break;
}
}
if (i == results.length) {
return EMPTY_RESULT_ARRAY;
}
updateLastCell(results[results.length - 1]);
if (i > 0) {
return Arrays.copyOfRange(results, i, results.length);
} else {
return results;
}
}
@Override
public void clear() {
// we do not cache anything
}
}

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A scan result cache for batched scan, i.e,
* {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
* <p>
* If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
* doesn't mean setAllowPartialResult(true).
*/
@InterfaceAudience.Private
public class BatchScanResultCache implements ScanResultCache {
private final int batch;
// used to filter out the cells that already returned to user as we always start from the
// beginning of a row when retry.
private Cell lastCell;
private final Deque<Result> partialResults = new ArrayDeque<>();
private int numCellsOfPartialResults;
public BatchScanResultCache(int batch) {
this.batch = batch;
}
private void updateLastCell(Result result) {
lastCell = result.rawCells()[result.rawCells().length - 1];
}
private Result createCompletedResult() throws IOException {
Result result = Result.createCompleteResult(partialResults);
partialResults.clear();
numCellsOfPartialResults = 0;
return result;
}
// Add new result to the partial list and return a batched Result if caching size exceed batching
// limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
// one Result back at most(or null, which means we do not have enough cells).
private Result regroupResults(Result result) {
partialResults.addLast(result);
numCellsOfPartialResults += result.size();
if (numCellsOfPartialResults < batch) {
return null;
}
Cell[] cells = new Cell[batch];
int cellCount = 0;
boolean stale = false;
for (;;) {
Result r = partialResults.pollFirst();
stale = stale || r.isStale();
int newCellCount = cellCount + r.size();
if (newCellCount > batch) {
// We have more cells than expected, so split the current result
int len = batch - cellCount;
System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
Cell[] remainingCells = new Cell[r.size() - len];
System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
partialResults.addFirst(
Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
break;
}
System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
if (newCellCount == batch) {
break;
}
cellCount = newCellCount;
}
numCellsOfPartialResults -= batch;
return Result.create(cells, null, stale,
result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
}
@Override
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
if (results.length == 0) {
if (!partialResults.isEmpty() && !isHeartbeatMessage) {
return new Result[] { createCompletedResult() };
}
return EMPTY_RESULT_ARRAY;
}
List<Result> regroupedResults = new ArrayList<>();
for (Result result : results) {
result = filterCells(result, lastCell);
if (result == null) {
continue;
}
// check if we have a row change
if (!partialResults.isEmpty() &&
!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
regroupedResults.add(createCompletedResult());
}
Result regroupedResult = regroupResults(result);
if (regroupedResult != null) {
regroupedResults.add(regroupedResult);
// only update last cell when we actually return it to user.
updateLastCell(regroupedResult);
}
if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
// We are done for this row
regroupedResults.add(createCompletedResult());
}
}
return regroupedResults.toArray(new Result[0]);
}
@Override
public void clear() {
partialResults.clear();
numCellsOfPartialResults = 0;
}
}

View File

@ -17,16 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang.mutable.MutableBoolean;
@ -34,13 +33,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue.MetaComparator;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
@ -69,23 +66,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected HRegionInfo currentRegion = null;
protected ScannerCallableWithReplicas callable = null;
protected final LinkedList<Result> cache = new LinkedList<Result>();
/**
* A list of partial results that have been returned from the server. This list should only
* contain results if this scanner does not have enough partial results to form the complete
* result.
*/
protected final LinkedList<Result> partialResults = new LinkedList<Result>();
protected int partialResultsCellSizes = 0;
/**
* The row for which we are accumulating partial Results (i.e. the row of the Results stored
* inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
* the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()}
*/
protected byte[] partialResultsRow = null;
/**
* The last cell from a not full Row which is added to cache
*/
protected Cell lastCellLoadedToCache = null;
private final ScanResultCache scanResultCache;
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
@ -106,7 +87,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected final int primaryOperationTimeout;
private int retries;
protected final ExecutorService pool;
private static MetaComparator metaComparator = new MetaComparator();
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
@ -159,6 +139,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.rpcControllerFactory = controllerFactory;
this.conf = conf;
this.scanResultCache = createScanResultCache(scan);
}
protected ClusterConnection getConnection() {
@ -361,14 +342,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
private void closeScannerIfExhausted(boolean exhausted) throws IOException {
if (exhausted) {
if (!partialResults.isEmpty()) {
// XXX: continue if there are partial results. But in fact server should not set
// hasMoreResults to false if there are partial results.
LOG.warn("Server tells us there is no more results for this region but we still have" +
" partialResults, this should not happen, retry on the current scanner anyway");
} else {
closeScanner();
}
closeScanner();
}
}
@ -376,7 +350,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
// An exception was thrown which makes any partial results that we were collecting
// invalid. The scanner will need to be reset to the beginning of a row.
clearPartialResults();
scanResultCache.clear();
// Unfortunately, DNRIOE is used in two different semantics.
// (1) The first is to close the client scanner and bubble up the exception all the way
@ -470,7 +444,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (callable.switchedToADifferentReplica()) {
// Any accumulated partial results are no longer valid since the callable will
// openScanner with the correct startkey and we must pick up from there
clearPartialResults();
scanResultCache.clear();
this.currentRegion = callable.getHRegionInfo();
}
retryAfterOutOfOrderException.setValue(true);
@ -490,28 +464,20 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
List<Result> resultsToAddToCache =
getResultsToAddToCache(values, callable.isHeartbeatMessage());
if (!resultsToAddToCache.isEmpty()) {
Result[] resultsToAddToCache =
scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
if (resultsToAddToCache.length > 0) {
for (Result rs : resultsToAddToCache) {
rs = filterLoadedCell(rs);
if (rs == null) {
continue;
}
cache.add(rs);
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
if (this.lastResult.mayHaveMoreCellsInRow()) {
updateLastCellLoadedToCache(this.lastResult);
} else {
this.lastCellLoadedToCache = null;
}
}
if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
if (scan.getLimit() > 0) {
int newLimit =
scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
assert newLimit >= 0;
scan.setLimit(newLimit);
}
@ -554,13 +520,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
// we are done with the current region
if (regionExhausted) {
if (!partialResults.isEmpty()) {
// XXX: continue if there are partial results. But in fact server should not set
// hasMoreResults to false if there are partial results.
LOG.warn("Server tells us there is no more results for this region but we still have" +
" partialResults, this should not happen, retry on the current scanner anyway");
continue;
}
if (!moveToNextRegion()) {
break;
}
@ -568,141 +527,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
}
/**
* This method ensures all of our book keeping regarding partial results is kept up to date. This
* method should be called once we know that the results we received back from the RPC request do
* not contain errors. We return a list of results that should be added to the cache. In general,
* this list will contain all NON-partial results from the input array (unless the client has
* specified that they are okay with receiving partial results)
* @param resultsFromServer The array of {@link Result}s returned from the server
* @param heartbeatMessage Flag indicating whether or not the response received from the server
* represented a complete response, or a heartbeat message that was sent to keep the
* client-server connection alive
* @return the list of results that should be added to the cache.
* @throws IOException
*/
protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
boolean heartbeatMessage) throws IOException {
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
// If the caller has indicated in their scan that they are okay with seeing partial results,
// then simply add all results to the list. Note allowPartial and setBatch are not same, we can
// return here if allow partials and we will handle batching later.
if (scan.getAllowPartialResults()) {
addResultsToList(resultsToAddToCache, resultsFromServer, 0,
(null == resultsFromServer ? 0 : resultsFromServer.length));
return resultsToAddToCache;
}
// If no results were returned it indicates that either we have the all the partial results
// necessary to construct the complete result or the server had to send a heartbeat message
// to the client to keep the client-server connection alive
if (resultsFromServer == null || resultsFromServer.length == 0) {
// If this response was an empty heartbeat message, then we have not exhausted the region
// and thus there may be more partials server side that still need to be added to the partial
// list before we form the complete Result
if (!partialResults.isEmpty() && !heartbeatMessage) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
return resultsToAddToCache;
}
for(Result result : resultsFromServer) {
if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) {
// We have a new row, complete the previous row.
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
clearPartialResults();
}
Result res = regroupResults(result);
if (res != null) {
resultsToAddToCache.add(res);
}
if (!result.mayHaveMoreCellsInRow()) {
// We are done for this row
if (partialResultsCellSizes > 0) {
resultsToAddToCache.add(Result.createCompleteResult(partialResults));
}
clearPartialResults();
}
}
return resultsToAddToCache;
}
/**
* Add new result to the partial list and return a batched Result if caching size exceed
* batching limit.
* If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user.
* setBatch doesn't mean setAllowPartialResult(true)
* @param result The result that we want to add to our list of partial Results
* @return the result if we have batch limit and there is one Result can be returned to user, or
* null if we have not.
* @throws IOException
*/
private Result regroupResults(final Result result) throws IOException {
partialResultsRow = result.getRow();
partialResults.add(result);
partialResultsCellSizes += result.size();
if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
Cell[] cells = new Cell[scan.getBatch()];
int count = 0;
boolean stale = false;
while (count < scan.getBatch()) {
Result res = partialResults.poll();
stale = stale || res.isStale();
if (res.size() + count <= scan.getBatch()) {
System.arraycopy(res.rawCells(), 0, cells, count, res.size());
count += res.size();
} else {
int len = scan.getBatch() - count;
System.arraycopy(res.rawCells(), 0, cells, count, len);
Cell[] remainingCells = new Cell[res.size() - len];
System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len);
Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(),
res.mayHaveMoreCellsInRow());
partialResults.addFirst(remainingRes);
count = scan.getBatch();
}
}
partialResultsCellSizes -= scan.getBatch();
if (partialResultsCellSizes == 0) {
// We have nothing in partialResults, clear the flags to prevent returning empty Result
// when next result belongs to the next row.
clearPartialResults();
}
return Result.create(cells, null, stale,
partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
}
return null;
}
/**
* Convenience method for clearing the list of partials and resetting the partialResultsRow.
*/
private void clearPartialResults() {
partialResults.clear();
partialResultsCellSizes = 0;
partialResultsRow = null;
}
/**
* Helper method for adding results between the indices [start, end) to the outputList
* @param outputList the list that results will be added to
* @param inputArray the array that results are taken from
* @param start beginning index (inclusive)
* @param end ending index (exclusive)
*/
private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
if (inputArray == null || start < 0 || end > inputArray.length) return;
for (int i = start; i < end; i++) {
outputList.add(inputArray[i]);
}
}
@Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();
@ -739,58 +563,4 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
return false;
}
protected void updateLastCellLoadedToCache(Result result) {
if (result.rawCells().length == 0) {
return;
}
this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
}
/**
* Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
* columns.
*/
private int compare(Cell a, Cell b) {
int r = 0;
if (currentRegion != null && currentRegion.isMetaRegion()) {
r = metaComparator.compareRows(a, b);
} else {
r = CellComparator.compareRows(a, b);
}
if (r != 0) {
return this.scan.isReversed() ? -r : r;
}
return CellComparator.compareWithoutRow(a, b);
}
private Result filterLoadedCell(Result result) {
// we only filter result when last result is partial
// so lastCellLoadedToCache and result should have same row key.
// However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
// 3) read more cell. lastCellLoadedToCache and result will be not at same row.
if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
return result;
}
if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
// The first cell of this result is larger than the last cell of loadcache.
// If user do not allow partial result, it must be true.
return result;
}
if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
// The last cell of this result is smaller than the last cell of loadcache, skip all.
return null;
}
// The first one must not in filtered result, we start at the second.
int index = 1;
while (index < result.rawCells().length) {
if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
break;
}
index++;
}
Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow());
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A scan result cache that only returns complete result.
*/
@InterfaceAudience.Private
class CompleteScanResultCache implements ScanResultCache {
private final List<Result> partialResults = new ArrayList<>();
private Result combine() throws IOException {
Result result = Result.createCompleteResult(partialResults);
partialResults.clear();
return result;
}
private Result[] prependCombined(Result[] results, int length) throws IOException {
if (length == 0) {
return new Result[] { combine() };
}
// the last part of a partial result may not be marked as partial so here we need to check if
// there is a row change.
int start;
if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) {
partialResults.add(results[0]);
start = 1;
length--;
} else {
start = 0;
}
Result[] prependResults = new Result[length + 1];
prependResults[0] = combine();
System.arraycopy(results, start, prependResults, 1, length);
return prependResults;
}
@Override
public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
// If no results were returned it indicates that either we have the all the partial results
// necessary to construct the complete result or the server had to send a heartbeat message
// to the client to keep the client-server connection alive
if (results.length == 0) {
// If this response was an empty heartbeat message, then we have not exhausted the region
// and thus there may be more partials server side that still need to be added to the partial
// list before we form the complete Result
if (!partialResults.isEmpty() && !isHeartbeatMessage) {
return new Result[] { combine() };
}
return EMPTY_RESULT_ARRAY;
}
// In every RPC response there should be at most a single partial result. Furthermore, if
// there is a partial result, it is guaranteed to be in the last position of the array.
Result last = results[results.length - 1];
if (last.mayHaveMoreCellsInRow()) {
if (partialResults.isEmpty()) {
partialResults.add(last);
return Arrays.copyOf(results, results.length - 1);
}
// We have only one result and it is partial
if (results.length == 1) {
// check if there is a row change
if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
partialResults.add(last);
return EMPTY_RESULT_ARRAY;
}
Result completeResult = combine();
partialResults.add(last);
return new Result[] { completeResult };
}
// We have some complete results
Result[] resultsToReturn = prependCombined(results, results.length - 1);
partialResults.add(last);
return resultsToReturn;
}
if (!partialResults.isEmpty()) {
return prependCombined(results, results.length);
}
return results;
}
@Override
public void clear() {
partialResults.clear();
}
}

View File

@ -24,12 +24,16 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
@ -239,6 +243,40 @@ public class ConnectionUtils {
return Bytes.equals(row, EMPTY_END_ROW);
}
private static final Comparator<Cell> COMPARE_WITHOUT_ROW = new Comparator<Cell>() {
@Override
public int compare(Cell o1, Cell o2) {
return CellComparator.compareWithoutRow(o1, o2);
}
};
static Result filterCells(Result result, Cell keepCellsAfter) {
if (keepCellsAfter == null) {
// do not need to filter
return result;
}
// not the same row
if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) {
return result;
}
Cell[] rawCells = result.rawCells();
int index = Arrays.binarySearch(rawCells, keepCellsAfter, COMPARE_WITHOUT_ROW);
if (index < 0) {
index = -index - 1;
} else {
index++;
}
if (index == 0) {
return result;
}
if (index == rawCells.length) {
return null;
}
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
result.isStale(), result.mayHaveMoreCellsInRow());
}
static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) {
if (isEmptyStopRow(info.getEndKey())) {
return true;
@ -287,4 +325,14 @@ public class ConnectionUtils {
}
return count;
}
public static ScanResultCache createScanResultCache(Scan scan) {
if (scan.getAllowPartialResults()) {
return new AllowPartialScanResultCache();
} else if (scan.getBatch() > 0) {
return new BatchScanResultCache(scan.getBatch());
} else {
return new CompleteScanResultCache();
}
}
}

View File

@ -24,7 +24,9 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -857,44 +858,42 @@ public class Result implements CellScannable, CellScanner {
* @throws IOException A complete result cannot be formed because the results in the partial list
* come from different rows
*/
public static Result createCompleteResult(List<Result> partialResults)
public static Result createCompleteResult(Iterable<Result> partialResults)
throws IOException {
List<Cell> cells = new ArrayList<Cell>();
if (partialResults == null) {
return Result.create(Collections.<Cell> emptyList(), null, false);
}
List<Cell> cells = new ArrayList<>();
boolean stale = false;
byte[] prevRow = null;
byte[] currentRow = null;
if (partialResults != null && !partialResults.isEmpty()) {
for (int i = 0; i < partialResults.size(); i++) {
Result r = partialResults.get(i);
currentRow = r.getRow();
if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
throw new IOException(
"Cannot form complete result. Rows of partial results do not match." +
" Partial Results: " + partialResults);
}
// Ensure that all Results except the last one are marked as partials. The last result
// may not be marked as a partial because Results are only marked as partials when
// the scan on the server side must be stopped due to reaching the maxResultSize.
// Visualizing it makes it easier to understand:
// maxResultSize: 2 cells
// (-x-) represents cell number x in a row
// Example: row1: -1- -2- -3- -4- -5- (5 cells total)
// How row1 will be returned by the server as partial Results:
// Result1: -1- -2- (2 cells, size limit reached, mark as partial)
// Result2: -3- -4- (2 cells, size limit reached, mark as partial)
// Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
throw new IOException(
"Cannot form complete result. Result is missing partial flag. " +
"Partial Results: " + partialResults);
}
prevRow = currentRow;
stale = stale || r.isStale();
for (Cell c : r.rawCells()) {
cells.add(c);
}
for (Iterator<Result> iter = partialResults.iterator(); iter.hasNext();) {
Result r = iter.next();
currentRow = r.getRow();
if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
throw new IOException(
"Cannot form complete result. Rows of partial results do not match." +
" Partial Results: " + partialResults);
}
// Ensure that all Results except the last one are marked as partials. The last result
// may not be marked as a partial because Results are only marked as partials when
// the scan on the server side must be stopped due to reaching the maxResultSize.
// Visualizing it makes it easier to understand:
// maxResultSize: 2 cells
// (-x-) represents cell number x in a row
// Example: row1: -1- -2- -3- -4- -5- (5 cells total)
// How row1 will be returned by the server as partial Results:
// Result1: -1- -2- (2 cells, size limit reached, mark as partial)
// Result2: -3- -4- (2 cells, size limit reached, mark as partial)
// Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) {
throw new IOException("Cannot form complete result. Result is missing partial flag. " +
"Partial Results: " + partialResults);
}
prevRow = currentRow;
stale = stale || r.isStale();
for (Cell c : r.rawCells()) {
cells.add(c);
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Used to separate the row constructing logic.
* <p>
* After we add heartbeat support for scan, RS may return partial result even if allowPartial is
* false and batch is 0. With this interface, the implementation now looks like:
* <ol>
* <li>Get results from ScanResponse proto.</li>
* <li>Pass them to ScanResultCache and get something back.</li>
* <li>If we actually get something back, then pass it to ScanObserver.</li>
* </ol>
*/
@InterfaceAudience.Private
interface ScanResultCache {
static final Result[] EMPTY_RESULT_ARRAY = new Result[0];
/**
* Add the given results to cache and get valid results back.
* @param results the results of a scan next. Must not be null.
* @param isHeartbeatMessage indicate whether the results is gotten from a heartbeat response.
* @return valid results, never null.
*/
Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException;
/**
* Clear the cached result if any. Called when scan error and we will start from a start of a row
* again.
*/
void clear();
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class, ClientTests.class })
public class TestAllowPartialScanResultCache {
private static byte[] CF = Bytes.toBytes("cf");
private AllowPartialScanResultCache resultCache;
@Before
public void setUp() {
resultCache = new AllowPartialScanResultCache();
}
@After
public void tearDown() {
resultCache.clear();
resultCache = null;
}
@Test
public void test() throws IOException {
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
Cell[] cells1 = createCells(CF, 1, 10);
Cell[] cells2 = createCells(CF, 2, 10);
Result[] results1 = resultCache.addAndGet(
new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
assertEquals(1, results1.length);
assertEquals(1, Bytes.toInt(results1[0].getRow()));
assertEquals(5, results1[0].rawCells().length);
for (int i = 0; i < 5; i++) {
assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))));
}
Result[] results2 = resultCache.addAndGet(
new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
assertEquals(1, results2.length);
assertEquals(1, Bytes.toInt(results2[0].getRow()));
assertEquals(5, results2[0].rawCells().length);
for (int i = 5; i < 10; i++) {
assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))));
}
Result[] results3 =
resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
assertEquals(1, results3.length);
assertEquals(2, Bytes.toInt(results3[0].getRow()));
assertEquals(10, results3[0].rawCells().length);
for (int i = 0; i < 10; i++) {
assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i))));
}
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class, ClientTests.class })
public class TestBatchScanResultCache {
private static byte[] CF = Bytes.toBytes("cf");
private BatchScanResultCache resultCache;
@Before
public void setUp() {
resultCache = new BatchScanResultCache(4);
}
@After
public void tearDown() {
resultCache.clear();
resultCache = null;
}
static Cell createCell(byte[] cf, int key, int cq) {
return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
}
static Cell[] createCells(byte[] cf, int key, int numCqs) {
Cell[] cells = new Cell[numCqs];
for (int i = 0; i < numCqs; i++) {
cells[i] = createCell(cf, key, i);
}
return cells;
}
private void assertResultEquals(Result result, int key, int start, int to) {
assertEquals(to - start, result.size());
for (int i = start; i < to; i++) {
assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + i))));
}
assertEquals(to - start == 4, result.mayHaveMoreCellsInRow());
}
@Test
public void test() throws IOException {
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
Cell[] cells1 = createCells(CF, 1, 10);
Cell[] cells2 = createCells(CF, 2, 10);
Cell[] cells3 = createCells(CF, 3, 10);
assertEquals(0, resultCache.addAndGet(
new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length);
Result[] results = resultCache.addAndGet(
new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true),
Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) },
false);
assertEquals(2, results.length);
assertResultEquals(results[0], 1, 0, 4);
assertResultEquals(results[1], 1, 4, 8);
results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false);
assertEquals(1, results.length);
assertResultEquals(results[0], 1, 8, 10);
results = resultCache.addAndGet(
new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true),
Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true),
Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true),
Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true),
Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true),
Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) },
false);
assertEquals(6, results.length);
assertResultEquals(results[0], 2, 0, 4);
assertResultEquals(results[1], 2, 4, 8);
assertResultEquals(results[2], 2, 8, 10);
assertResultEquals(results[3], 3, 0, 4);
assertResultEquals(results[4], 3, 4, 8);
assertResultEquals(results[5], 3, 8, 10);
}
}

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class, ClientTests.class })
public class TestCompleteResultScanResultCache {
private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ1 = Bytes.toBytes("cq1");
private static byte[] CQ2 = Bytes.toBytes("cq2");
private static byte[] CQ3 = Bytes.toBytes("cq3");
private CompleteScanResultCache resultCache;
@Before
public void setUp() {
resultCache = new CompleteScanResultCache();
}
@After
public void tearDown() {
resultCache.clear();
resultCache = null;
}
private static Cell createCell(int key, byte[] cq) {
return new KeyValue(Bytes.toBytes(key), CF, cq, Bytes.toBytes(key));
}
@Test
public void testNoPartial() throws IOException {
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
int count = 10;
Result[] results = new Result[count];
for (int i = 0; i < count; i++) {
results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
}
assertSame(results, resultCache.addAndGet(results, false));
}
@Test
public void testCombine1() throws IOException {
Result previousResult = Result.create(Arrays.asList(createCell(0, CQ1)), null, false, true);
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
Result[] results = resultCache.addAndGet(new Result[] { previousResult, result1 }, false);
assertEquals(1, results.length);
assertSame(previousResult, results[0]);
assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
assertEquals(0, resultCache.addAndGet(new Result[0], true).length);
results = resultCache.addAndGet(new Result[0], false);
assertEquals(1, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(3, results[0].rawCells().length);
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
}
@Test
public void testCombine2() throws IOException {
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
Result result3 = Result.create(Arrays.asList(createCell(1, CQ3)), null, false, true);
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ2)), null, false, false);
assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
assertEquals(0, resultCache.addAndGet(new Result[] { result3 }, false).length);
Result[] results = resultCache.addAndGet(new Result[] { nextResult1 }, false);
assertEquals(1, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(3, results[0].rawCells().length);
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ3)));
results = resultCache.addAndGet(new Result[] { nextToNextResult1 }, false);
assertEquals(2, results.length);
assertEquals(2, Bytes.toInt(results[0].getRow()));
assertEquals(1, results[0].rawCells().length);
assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(3, Bytes.toInt(results[1].getRow()));
assertEquals(1, results[1].rawCells().length);
assertEquals(3, Bytes.toInt(results[1].getValue(CF, CQ2)));
}
@Test
public void testCombine3() throws IOException {
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, true);
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, false);
Result nextToNextResult1 = Result.create(Arrays.asList(createCell(3, CQ1)), null, false, true);
assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
assertEquals(0, resultCache.addAndGet(new Result[] { result2 }, false).length);
Result[] results =
resultCache.addAndGet(new Result[] { nextResult1, nextToNextResult1 }, false);
assertEquals(2, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
assertEquals(2, Bytes.toInt(results[1].getRow()));
assertEquals(1, results[1].rawCells().length);
assertEquals(2, Bytes.toInt(results[1].getValue(CF, CQ1)));
results = resultCache.addAndGet(new Result[0], false);
assertEquals(1, results.length);
assertEquals(3, Bytes.toInt(results[0].getRow()));
assertEquals(1, results[0].rawCells().length);
assertEquals(3, Bytes.toInt(results[0].getValue(CF, CQ1)));
}
@Test
public void testCombine4() throws IOException {
Result result1 = Result.create(Arrays.asList(createCell(1, CQ1)), null, false, true);
Result result2 = Result.create(Arrays.asList(createCell(1, CQ2)), null, false, false);
Result nextResult1 = Result.create(Arrays.asList(createCell(2, CQ1)), null, false, true);
Result nextResult2 = Result.create(Arrays.asList(createCell(2, CQ2)), null, false, false);
assertEquals(0, resultCache.addAndGet(new Result[] { result1 }, false).length);
Result[] results = resultCache.addAndGet(new Result[] { result2, nextResult1 }, false);
assertEquals(1, results.length);
assertEquals(1, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(1, Bytes.toInt(results[0].getValue(CF, CQ2)));
results = resultCache.addAndGet(new Result[] { nextResult2 }, false);
assertEquals(1, results.length);
assertEquals(2, Bytes.toInt(results[0].getRow()));
assertEquals(2, results[0].rawCells().length);
assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ1)));
assertEquals(2, Bytes.toInt(results[0].getValue(CF, CQ2)));
}
}