HBASE-17583 Add inclusive/exclusive support for startRow and endRow of scan for sync client

This commit is contained in:
zhangduo 2017-02-07 15:24:23 +08:00
parent e019961150
commit c8f3c8630b
14 changed files with 246 additions and 385 deletions

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Threads;
* This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}. * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ClientAsyncPrefetchScanner extends ClientScanner { public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024; private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024;
private static final int DEFAULT_QUEUE_CAPACITY = 1024; private static final int DEFAULT_QUEUE_CAPACITY = 1024;

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -64,7 +62,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
private static final Log LOG = LogFactory.getLog(ClientScanner.class); private static final Log LOG = LogFactory.getLog(ClientScanner.class);
protected Scan scan; protected final Scan scan;
protected boolean closed = false; protected boolean closed = false;
// Current region scanner is against. Gets cleared if current region goes // Current region scanner is against. Gets cleared if current region goes
// wonky: e.g. if it splits on us. // wonky: e.g. if it splits on us.
@ -162,8 +160,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
initCache(); initCache();
} }
protected abstract void initCache();
protected ClusterConnection getConnection() { protected ClusterConnection getConnection() {
return this.connection; return this.connection;
} }
@ -209,89 +205,71 @@ public abstract class ClientScanner extends AbstractClientScanner {
return maxScannerResultSize; return maxScannerResultSize;
} }
// returns true if the passed region endKey private void closeScanner() throws IOException {
protected boolean checkScanStopRow(final byte[] endKey) {
if (this.scan.getStopRow().length > 0) {
// there is a stop row, check to see if we are past it.
byte[] stopRow = scan.getStopRow();
int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, endKey.length);
if (cmp <= 0) {
// stopRow <= endKey (endKey is equals to or larger than stopRow)
// This is a stop.
return true;
}
}
return false; // unlikely.
}
protected final void closeScanner() throws IOException {
if (this.callable != null) { if (this.callable != null) {
this.callable.setClose(); this.callable.setClose();
call(callable, caller, scannerTimeout); call(callable, caller, scannerTimeout, false);
this.callable = null; this.callable = null;
} }
} }
/** /**
* Gets a scanner for the next region. If this.currentRegion != null, then we will move to the * Will be called in moveToNextRegion when currentRegion is null. Abstract because for normal
* endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). * scan, we will start next scan from the endKey of the currentRegion, and for reversed scan, we
* @param nbRows the caching option of the scan * will start next scan from the startKey of the currentRegion.
* @return the results fetched when open scanner, or null which means terminate the scan. * @return {@code false} if we have reached the stop row. Otherwise {@code true}.
*/ */
protected Result[] nextScanner(int nbRows) throws IOException { protected abstract boolean setNewStartKey();
/**
* Will be called in moveToNextRegion to create ScannerCallable. Abstract because for reversed
* scan we need to create a ReversedScannerCallable.
*/
protected abstract ScannerCallable createScannerCallable();
/**
* Close the previous scanner and create a new ScannerCallable for the next scanner.
* <p>
* Marked as protected only because TestClientScanner need to override this method.
* @return false if we should terminate the scan. Otherwise
*/
@VisibleForTesting
protected boolean moveToNextRegion() {
// Close the previous scanner if it's open // Close the previous scanner if it's open
try {
closeScanner(); closeScanner();
} catch (IOException e) {
// Where to start the next scanner // not a big deal continue
byte[] localStartKey; if (LOG.isDebugEnabled()) {
LOG.debug("close scanner for " + currentRegion + " failed", e);
// if we're at end of table, close and return null to stop iterating
if (this.currentRegion != null) {
byte[] endKey = this.currentRegion.getEndKey();
if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
checkScanStopRow(endKey)) {
close();
if (LOG.isTraceEnabled()) {
LOG.trace("Finished " + this.currentRegion);
} }
return null;
} }
localStartKey = endKey; if (currentRegion != null) {
// clear mvcc read point if we are going to switch regions if (!setNewStartKey()) {
return false;
}
scan.resetMvccReadPoint(); scan.resetMvccReadPoint();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Finished " + this.currentRegion); LOG.trace("Finished " + this.currentRegion);
} }
} else {
localStartKey = this.scan.getStartRow();
} }
if (LOG.isDebugEnabled() && this.currentRegion != null) { if (LOG.isDebugEnabled() && this.currentRegion != null) {
// Only worth logging if NOT first region in scan. // Only worth logging if NOT first region in scan.
LOG.debug( LOG.debug(
"Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(scan.getStartRow()) +
"', " + (scan.includeStartRow() ? "inclusive" : "exclusive"));
} }
try { // clear the current region, we will set a new value to it after the first call of the new
callable = getScannerCallable(localStartKey, nbRows); // callable.
// Open a scanner on the region server starting at the this.currentRegion = null;
// beginning of the region this.callable =
Result[] rrs = call(callable, caller, scannerTimeout); new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool,
this.currentRegion = callable.getHRegionInfo(); primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller);
this.callable.setCaching(this.caching);
if (this.scanMetrics != null) { if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet(); this.scanMetrics.countOfRegions.incrementAndGet();
} }
if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { return true;
// no results for the scan, return null to terminate the scan.
closed = true;
callable = null;
currentRegion = null;
return null;
}
return rrs;
} catch (IOException e) {
closeScanner();
throw e;
}
} }
@VisibleForTesting @VisibleForTesting
@ -300,24 +278,17 @@ public abstract class ClientScanner extends AbstractClientScanner {
} }
private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller,
int scannerTimeout) throws IOException { int scannerTimeout, boolean updateCurrentRegion) throws IOException {
if (Thread.interrupted()) { if (Thread.interrupted()) {
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries // we do a callWithRetries
return caller.callWithoutRetries(callable, scannerTimeout); Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout);
if (currentRegion == null && updateCurrentRegion) {
currentRegion = callable.getHRegionInfo();
} }
return rrs;
@InterfaceAudience.Private
protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, int nbRows) {
scan.setStartRow(localStartKey);
ScannerCallable s = new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
s.setCaching(nbRows);
ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s,
pool, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller);
return sr;
} }
/** /**
@ -367,9 +338,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
} }
private boolean scanExhausted(Result[] values) { private boolean scanExhausted(Result[] values) {
// This means the server tells us the whole scan operation is done. Usually decided by filter or return callable.moreResultsForScan() == MoreResults.NO;
// limit.
return values == null || callable.moreResultsForScan() == MoreResults.NO;
} }
private boolean regionExhausted(Result[] values) { private boolean regionExhausted(Result[] values) {
@ -377,8 +346,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
// old time we always return empty result for a open scanner operation so we add a check here to // old time we always return empty result for a open scanner operation so we add a check here to
// keep compatible with the old logic. Should remove the isOpenScanner in the future. // keep compatible with the old logic. Should remove the isOpenScanner in the future.
// 2. Server tells us that it has no more results for this region. // 2. Server tells us that it has no more results for this region.
return (values.length == 0 && !callable.isHeartbeatMessage() && !callable.isOpenScanner()) return (values.length == 0 && !callable.isHeartbeatMessage()) ||
|| callable.moreResultsInRegion() == MoreResults.NO; callable.moreResultsInRegion() == MoreResults.NO;
} }
private void closeScannerIfExhausted(boolean exhausted) throws IOException { private void closeScannerIfExhausted(boolean exhausted) throws IOException {
@ -386,25 +355,14 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (!partialResults.isEmpty()) { if (!partialResults.isEmpty()) {
// XXX: continue if there are partial results. But in fact server should not set // XXX: continue if there are partial results. But in fact server should not set
// hasMoreResults to false if there are partial results. // hasMoreResults to false if there are partial results.
LOG.warn("Server tells us there is no more results for this region but we still have" LOG.warn("Server tells us there is no more results for this region but we still have" +
+ " partialResults, this should not happen, retry on the current scanner anyway"); " partialResults, this should not happen, retry on the current scanner anyway");
} else { } else {
closeScanner(); closeScanner();
} }
} }
} }
private Result[] nextScannerWithRetries(int nbRows) throws IOException {
int retriesLeft = getRetries();
for (;;) {
try {
return nextScanner(nbRows);
} catch (DoNotRetryIOException e) {
handleScanError(e, null, retriesLeft--);
}
}
}
private void handleScanError(DoNotRetryIOException e, private void handleScanError(DoNotRetryIOException e,
MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException { MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
// An exception was thrown which makes any partial results that we were collecting // An exception was thrown which makes any partial results that we were collecting
@ -444,20 +402,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
// The region has moved. We need to open a brand new scanner at the new location. // The region has moved. We need to open a brand new scanner at the new location.
// Reset the startRow to the row we've seen last so that the new scanner starts at // Reset the startRow to the row we've seen last so that the new scanner starts at
// the correct row. Otherwise we may see previously returned rows again. // the correct row. Otherwise we may see previously returned rows again.
// (ScannerCallable by now has "relocated" the correct region) // If the lastRow is not partial, then we should start from the next row. As now we can
if (!this.lastResult.isPartial() && scan.getBatch() < 0) { // exclude the start row, the logic here is the same for both normal scan and reversed scan.
if (scan.isReversed()) { // If lastResult is partial then include it, otherwise exclude it.
scan.setStartRow(createClosestRowBefore(lastResult.getRow())); scan.withStartRow(lastResult.getRow(), lastResult.isPartial() || scan.getBatch() > 0);
} else {
scan.setStartRow(createClosestRowAfter(lastResult.getRow()));
}
} else {
// we need rescan this row because we only loaded partial row before
scan.setStartRow(lastResult.getRow());
}
} }
if (e instanceof OutOfOrderScannerNextException) { if (e instanceof OutOfOrderScannerNextException) {
if (retryAfterOutOfOrderException != null) {
if (retryAfterOutOfOrderException.isTrue()) { if (retryAfterOutOfOrderException.isTrue()) {
retryAfterOutOfOrderException.setValue(false); retryAfterOutOfOrderException.setValue(false);
} else { } else {
@ -466,7 +416,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
"Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e);
} }
} }
}
// Clear region. // Clear region.
this.currentRegion = null; this.currentRegion = null;
// Set this to zero so we don't try and do an rpc and close on remote server when // Set this to zero so we don't try and do an rpc and close on remote server when
@ -482,18 +431,14 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (closed) { if (closed) {
return; return;
} }
Result[] values = null;
long remainingResultSize = maxScannerResultSize; long remainingResultSize = maxScannerResultSize;
int countdown = this.caching; int countdown = this.caching;
// This is possible if we just stopped at the boundary of a region in the previous call. // This is possible if we just stopped at the boundary of a region in the previous call.
if (callable == null) { if (callable == null) {
values = nextScannerWithRetries(countdown); if (!moveToNextRegion()) {
if (values == null) {
return; return;
} }
} }
// We need to reset it if it's a new callable that was created with a countdown in nextScanner
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do // This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us. // this when we reset scanner because it split under us.
MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
@ -501,15 +446,14 @@ public abstract class ClientScanner extends AbstractClientScanner {
// make sure that we are not retrying indefinitely. // make sure that we are not retrying indefinitely.
int retriesLeft = getRetries(); int retriesLeft = getRetries();
for (;;) { for (;;) {
Result[] values;
try { try {
// Server returns a null values if scanning is to stop. Else, // Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just // returns an empty array if scanning is to go on and we've just
// exhausted current region. // exhausted current region.
// now we will also fetch data when openScanner, so do not make a next call again if values // now we will also fetch data when openScanner, so do not make a next call again if values
// is already non-null. // is already non-null.
if (values == null) { values = call(callable, caller, scannerTimeout, true);
values = call(callable, caller, scannerTimeout);
}
// When the replica switch happens, we need to do certain operations again. // When the replica switch happens, we need to do certain operations again.
// The callable will openScanner with the right startkey but we need to pick up // The callable will openScanner with the right startkey but we need to pick up
// from there. Bypass the rest of the loop and let the catch-up happen in the beginning // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
@ -519,19 +463,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
// openScanner with the correct startkey and we must pick up from there // openScanner with the correct startkey and we must pick up from there
clearPartialResults(); clearPartialResults();
this.currentRegion = callable.getHRegionInfo(); this.currentRegion = callable.getHRegionInfo();
// Now we will also fetch data when openScanner so usually we should not get a null
// result, but at some places we still use null to indicate the scan is terminated, so add
// a sanity check here. Should be removed later.
if (values == null) {
continue;
}
} }
retryAfterOutOfOrderException.setValue(true); retryAfterOutOfOrderException.setValue(true);
} catch (DoNotRetryIOException e) { } catch (DoNotRetryIOException e) {
handleScanError(e, retryAfterOutOfOrderException, retriesLeft--); handleScanError(e, retryAfterOutOfOrderException, retriesLeft--);
// reopen the scanner // reopen the scanner
values = nextScannerWithRetries(countdown); if (!moveToNextRegion()) {
if (values == null) {
break; break;
} }
continue; continue;
@ -584,8 +521,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
// loop until a limit (e.g. size or caching) is reached, break out early to avoid causing // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
// unnecesary delays to the caller // unnecesary delays to the caller
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Heartbeat message received and cache contains Results." LOG.trace("Heartbeat message received and cache contains Results." +
+ " Breaking out of scan loop"); " Breaking out of scan loop");
} }
// we know that the region has not been exhausted yet so just break without calling // we know that the region has not been exhausted yet so just break without calling
// closeScannerIfExhausted // closeScannerIfExhausted
@ -612,17 +549,13 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (!partialResults.isEmpty()) { if (!partialResults.isEmpty()) {
// XXX: continue if there are partial results. But in fact server should not set // XXX: continue if there are partial results. But in fact server should not set
// hasMoreResults to false if there are partial results. // hasMoreResults to false if there are partial results.
LOG.warn("Server tells us there is no more results for this region but we still have" LOG.warn("Server tells us there is no more results for this region but we still have" +
+ " partialResults, this should not happen, retry on the current scanner anyway"); " partialResults, this should not happen, retry on the current scanner anyway");
values = null; // reset values for the next call
continue; continue;
} }
values = nextScannerWithRetries(countdown); if (!moveToNextRegion()) {
if (values == null) {
break; break;
} }
} else {
values = null; // reset values for the next call
} }
} }
} }
@ -772,9 +705,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
private void addToPartialResults(final Result result) throws IOException { private void addToPartialResults(final Result result) throws IOException {
final byte[] row = result.getRow(); final byte[] row = result.getRow();
if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) { if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
throw new IOException("Partial result row does not match. All partial results must come " throw new IOException("Partial result row does not match. All partial results must come " +
+ "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: " +
+ Bytes.toString(row)); Bytes.toString(row));
} }
partialResultsRow = row; partialResultsRow = row;
partialResults.add(result); partialResults.add(result);
@ -809,7 +742,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (callable != null) { if (callable != null) {
callable.setClose(); callable.setClose();
try { try {
call(callable, caller, scannerTimeout); call(callable, caller, scannerTimeout, false);
} catch (UnknownScannerException e) { } catch (UnknownScannerException e) {
// We used to catch this error, interpret, and rethrow. However, we // We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to // have since decided that it's not nice for a scanner's close to
@ -893,4 +826,13 @@ public abstract class ClientScanner extends AbstractClientScanner {
Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
} }
protected void initCache() {
initSyncCache();
}
@Override
public Result next() throws IOException {
return nextWithSyncCache();
}
} }

View File

@ -17,14 +17,18 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
/** /**
* ClientSimpleScanner implements a sync scanner behaviour. * ClientSimpleScanner implements a sync scanner behaviour.
* The cache is a simple list. * The cache is a simple list.
@ -41,12 +45,22 @@ public class ClientSimpleScanner extends ClientScanner {
} }
@Override @Override
protected void initCache() { protected boolean setNewStartKey() {
initSyncCache(); if (noMoreResultsForScan(scan, currentRegion)) {
return false;
}
scan.withStartRow(currentRegion.getEndKey(), true);
return true;
} }
@Override @Override
public Result next() throws IOException { protected ScannerCallable createScannerCallable() {
return nextWithSyncCache(); if (!scan.includeStartRow() && !isEmptyStartRow(scan.getStartRow())) {
// we have not implemented locate to next row for sync client yet, so here we change the
// inclusive of start row to true.
scan.withStartRow(createClosestRowAfter(scan.getStartRow()), true);
}
return new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
} }
} }

View File

@ -787,7 +787,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
Scan s = new Scan(); Scan s = new Scan();
s.setReversed(true); s.setReversed(true);
s.setStartRow(metaKey); s.withStartRow(metaKey);
s.addFamily(HConstants.CATALOG_FAMILY); s.addFamily(HConstants.CATALOG_FAMILY);
s.setOneRowLimit(); s.setOneRowLimit();
if (this.useMetaReplicas) { if (this.useMetaReplicas) {

View File

@ -248,12 +248,9 @@ public final class ConnectionUtils {
} }
/** /**
* Create the closest row before the specified row * Create a row before the specified row and very close to the specified row.
* @deprecated in fact, we do not know the closest row before the given row, the result is only a
* row very close to the current row. Avoid using this method in the future.
*/ */
@Deprecated static byte[] createCloseRowBefore(byte[] row) {
static byte[] createClosestRowBefore(byte[] row) {
if (row.length == 0) { if (row.length == 0) {
return MAX_BYTE_ARRAY; return MAX_BYTE_ARRAY;
} }

View File

@ -18,32 +18,25 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ExceptionUtil;
/** /**
* A reversed client scanner which support backward scanning * A reversed client scanner which support backward scanning
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReversedClientScanner extends ClientSimpleScanner { public class ReversedClientScanner extends ClientScanner {
private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
/** /**
* Create a new ReversibleClientScanner for the specified table Note that the * Create a new ReversibleClientScanner for the specified table Note that the passed
* passed {@link Scan}'s start row maybe changed. * {@link Scan}'s start row maybe changed.
* @param conf * @param conf
* @param scan * @param scan
* @param tableName * @param tableName
@ -52,111 +45,26 @@ public class ReversedClientScanner extends ClientSimpleScanner {
* @param primaryOperationTimeout * @param primaryOperationTimeout
* @throws IOException * @throws IOException
*/ */
public ReversedClientScanner(Configuration conf, Scan scan, public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
TableName tableName, ClusterConnection connection, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
ExecutorService pool, int primaryOperationTimeout) throws IOException { throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
primaryOperationTimeout); primaryOperationTimeout);
} }
@Override @Override
protected Result[] nextScanner(int nbRows) throws IOException { protected boolean setNewStartKey() {
// Close the previous scanner if it's open if (noMoreResultsForReverseScan(scan, currentRegion)) {
closeScanner(); return false;
// Where to start the next scanner
byte[] localStartKey;
boolean locateTheClosestFrontRow = true;
// if we're at start of table, close and return false to stop iterating
if (this.currentRegion != null) {
byte[] startKey = this.currentRegion.getStartKey();
if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
|| checkScanStopRow(startKey)) {
close();
if (LOG.isDebugEnabled()) {
LOG.debug("Finished " + this.currentRegion);
} }
return null; scan.withStartRow(currentRegion.getStartKey(), false);
} return true;
localStartKey = startKey;
// clear mvcc read point if we are going to switch regions
scan.resetMvccReadPoint();
if (LOG.isDebugEnabled()) {
LOG.debug("Finished " + this.currentRegion);
}
} else {
localStartKey = this.scan.getStartRow();
if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) {
locateTheClosestFrontRow = false;
}
}
if (LOG.isDebugEnabled() && this.currentRegion != null) {
// Only worth logging if NOT first region in scan.
LOG.debug("Advancing internal scanner to startKey at '"
+ Bytes.toStringBinary(localStartKey) + "'");
}
try {
// In reversed scan, we want to locate the previous region through current
// region's start key. In order to get that previous region, first we
// create a closest row before the start key of current region, then
// locate all the regions from the created closest row to start key of
// current region, thus the last one of located regions should be the
// previous region of current region. The related logic of locating
// regions is implemented in ReversedScannerCallable
byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey)
: null;
callable = getScannerCallable(localStartKey, nbRows, locateStartRow);
// Open a scanner on the region server starting at the
// beginning of the region
// callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
// we do a callWithRetries
Result[] rrs = this.caller.callWithoutRetries(callable, scannerTimeout);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) {
// no results for the scan, return null to terminate the scan.
return null;
}
return rrs;
} catch (IOException e) {
ExceptionUtil.rethrowIfInterrupt(e);
close();
throw e;
}
}
protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey,
int nbRows, byte[] locateStartRow) {
scan.setStartRow(localStartKey);
ScannerCallable s =
new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
locateStartRow, this.rpcControllerFactory);
s.setCaching(nbRows);
ScannerCallableWithReplicas sr =
new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool,
primaryOperationTimeout, scan, getRetries(), getScannerTimeout(), caching, getConf(),
caller);
return sr;
} }
@Override @Override
// returns true if stopRow >= passed region startKey protected ReversedScannerCallable createScannerCallable() {
protected boolean checkScanStopRow(final byte[] startKey) { return new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
if (this.scan.getStopRow().length > 0) { this.rpcControllerFactory);
// there is a stop row, check to see if we are past it.
byte[] stopRow = scan.getStopRow();
int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0,
startKey.length);
if (cmp >= 0) {
// stopRow >= startKey (stopRow is equals to or larger than endKey)
// This is a stop.
return true;
}
}
return false; // unlikely.
} }
} }

View File

@ -18,6 +18,9 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -39,26 +42,18 @@ import org.apache.hadoop.hbase.util.Bytes;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReversedScannerCallable extends ScannerCallable { public class ReversedScannerCallable extends ScannerCallable {
/**
* The start row for locating regions. In reversed scanner, may locate the
* regions for a range of keys when doing
* {@link ReversedClientScanner#nextScanner(int)}
*/
protected final byte[] locateStartRow;
/** /**
* @param connection * @param connection
* @param tableName * @param tableName
* @param scan * @param scan
* @param scanMetrics * @param scanMetrics
* @param locateStartRow The start row for locating regions * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
* @param rpcFactory to create an {@link com.google.protobuf.RpcController} * regionserver
* to talk to the regionserver
*/ */
public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory) { ScanMetrics scanMetrics, RpcControllerFactory rpcFactory) {
super(connection, tableName, scan, scanMetrics, rpcFactory); super(connection, tableName, scan, scanMetrics, rpcFactory);
this.locateStartRow = locateStartRow;
} }
/** /**
@ -66,28 +61,13 @@ public class ReversedScannerCallable extends ScannerCallable {
* @param tableName * @param tableName
* @param scan * @param scan
* @param scanMetrics * @param scanMetrics
* @param locateStartRow The start row for locating regions * @param rpcFactory to create an {@link com.google.protobuf.RpcController} to talk to the
* @param rpcFactory to create an {@link com.google.protobuf.RpcController} * regionserver
* to talk to the regionserver
* @param replicaId the replica id * @param replicaId the replica id
*/ */
public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory rpcFactory, ScanMetrics scanMetrics, RpcControllerFactory rpcFactory, int replicaId) {
int replicaId) {
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId); super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
this.locateStartRow = locateStartRow;
}
/**
* @deprecated use
* {@link #ReversedScannerCallable(ClusterConnection, TableName, Scan,
* ScanMetrics, byte[], RpcControllerFactory )}
*/
@Deprecated
public ReversedScannerCallable(ClusterConnection connection, TableName tableName,
Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
.instantiate(connection.getConfiguration()));
} }
/** /**
@ -100,12 +80,15 @@ public class ReversedScannerCallable extends ScannerCallable {
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
if (!instantiated || reload) { if (!instantiated || reload) {
if (locateStartRow == null) { // we should use range locate if
// 1. we do not want the start row
// 2. the start row is empty which means we need to locate to the last region.
if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
// Just locate the region with the row // Just locate the region with the row
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id, RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id,
getConnection(), getTableName(), getRow()); getConnection(), getTableName(), getRow());
this.location = id < rl.size() ? rl.getRegionLocation(id) : null; this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (this.location == null) { if (location == null || location.getServerName() == null) {
throw new IOException("Failed to find location, tableName=" throw new IOException("Failed to find location, tableName="
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload=" + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
+ reload); + reload);
@ -113,6 +96,7 @@ public class ReversedScannerCallable extends ScannerCallable {
} else { } else {
// Need to locate the regions with the range, and the target location is // Need to locate the regions with the range, and the target location is
// the last one which is the previous region of last region scanner // the last one which is the previous region of last region scanner
byte[] locateStartRow = createCloseRowBefore(getRow());
List<HRegionLocation> locatedRegions = locateRegionsInRange( List<HRegionLocation> locatedRegions = locateRegionsInRange(
locateStartRow, getRow(), reload); locateStartRow, getRow(), reload);
if (locatedRegions.isEmpty()) { if (locatedRegions.isEmpty()) {
@ -177,7 +161,7 @@ public class ReversedScannerCallable extends ScannerCallable {
@Override @Override
public ScannerCallable getScannerCallableForReplica(int id) { public ScannerCallable getScannerCallableForReplica(int id) {
ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(),
this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id); this.getScan(), this.scanMetrics, rpcControllerFactory, id);
r.setCaching(this.getCaching()); r.setCaching(this.getCaching());
return r; return r;
} }

View File

@ -67,7 +67,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
protected boolean instantiated = false; protected boolean instantiated = false;
protected boolean closed = false; protected boolean closed = false;
protected boolean renew = false; protected boolean renew = false;
private Scan scan; protected final Scan scan;
private int caching = 1; private int caching = 1;
protected ScanMetrics scanMetrics; protected ScanMetrics scanMetrics;
private boolean logScannerActivity = false; private boolean logScannerActivity = false;
@ -82,7 +82,6 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
private MoreResults moreResultsInRegion; private MoreResults moreResultsInRegion;
private MoreResults moreResultsForScan; private MoreResults moreResultsForScan;
private boolean openScanner;
/** /**
* Saves whether or not the most recent response from the server was a heartbeat message. * Saves whether or not the most recent response from the server was a heartbeat message.
* Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
@ -253,10 +252,8 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
} }
ScanResponse response; ScanResponse response;
if (this.scannerId == -1L) { if (this.scannerId == -1L) {
this.openScanner = true;
response = openScanner(); response = openScanner();
} else { } else {
this.openScanner = false;
response = next(); response = next();
} }
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
@ -469,12 +466,4 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
void setMoreResultsForScan(MoreResults moreResults) { void setMoreResultsForScan(MoreResults moreResults) {
this.moreResultsForScan = moreResults; this.moreResultsForScan = moreResults;
} }
/**
* Whether the previous call is openScanner. This is used to keep compatible with the old
* implementation that we always returns empty result for openScanner.
*/
boolean isOpenScanner() {
return openScanner;
}
} }

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
@ -122,10 +119,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
return currentScannerCallable.moreResultsForScan(); return currentScannerCallable.moreResultsForScan();
} }
public boolean isOpenScanner() {
return currentScannerCallable.isOpenScanner();
}
@Override @Override
public Result [] call(int timeout) throws IOException { public Result [] call(int timeout) throws IOException {
// If the active replica callable was closed somewhere, invoke the RPC to // If the active replica callable was closed somewhere, invoke the RPC to
@ -321,24 +314,17 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
* @param callable The callable to set the start row on * @param callable The callable to set the start row on
*/ */
private void setStartRowForReplicaCallable(ScannerCallable callable) { private void setStartRowForReplicaCallable(ScannerCallable callable) {
if (this.lastResult == null || callable == null) return; if (this.lastResult == null || callable == null) {
return;
if (this.lastResult.isPartial()) { }
// The last result was a partial result which means we have not received all of the cells // 1. The last result was a partial result which means we have not received all of the cells
// for this row. Thus, use the last result's row as the start row. If a replica switch // for this row. Thus, use the last result's row as the start row. If a replica switch
// occurs, the scanner will ensure that any accumulated partial results are cleared, // occurs, the scanner will ensure that any accumulated partial results are cleared,
// and the scan can resume from this row. // and the scan can resume from this row.
callable.getScan().setStartRow(this.lastResult.getRow()); // 2. The last result was not a partial result which means it contained all of the cells for
} else {
// The last result was not a partial result which means it contained all of the cells for
// that row (we no longer need any information from it). Set the start row to the next // that row (we no longer need any information from it). Set the start row to the next
// closest row that could be seen. // closest row that could be seen.
if (callable.getScan().isReversed()) { callable.getScan().withStartRow(this.lastResult.getRow(), this.lastResult.isPartial());
callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
} else {
callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow()));
}
}
} }
@VisibleForTesting @VisibleForTesting

View File

@ -90,7 +90,7 @@ public class TestClientScanner {
} }
} }
private static class MockClientScanner extends ClientScanner { private static class MockClientScanner extends ClientSimpleScanner {
private boolean rpcFinished = false; private boolean rpcFinished = false;
private boolean rpcFinishedFired = false; private boolean rpcFinishedFired = false;
@ -105,50 +105,26 @@ public class TestClientScanner {
} }
@Override @Override
protected Result[] nextScanner(int nbRows) throws IOException { protected boolean moveToNextRegion() {
if (!initialized) { if (!initialized) {
initialized = true; initialized = true;
return super.nextScanner(nbRows); return super.moveToNextRegion();
} }
if (!rpcFinished) { if (!rpcFinished) {
return super.nextScanner(nbRows); return super.moveToNextRegion();
} }
// Enforce that we don't short-circuit more than once // Enforce that we don't short-circuit more than once
if (rpcFinishedFired) { if (rpcFinishedFired) {
throw new RuntimeException("Expected nextScanner to only be called once after " + throw new RuntimeException("Expected nextScanner to only be called once after " +
" short-circuit was triggered."); " short-circuit was triggered.");
} }
rpcFinishedFired = true; rpcFinishedFired = true;
return null; return false;
}
@Override
protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
int nbRows) {
scan.setStartRow(localStartKey);
ScannerCallable s =
new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
this.rpcControllerFactory);
s.setCaching(nbRows);
ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(),
s, pool, primaryOperationTimeout, scan,
getRetries(), scannerTimeout, caching, conf, caller);
return sr;
} }
public void setRpcFinished(boolean rpcFinished) { public void setRpcFinished(boolean rpcFinished) {
this.rpcFinished = rpcFinished; this.rpcFinished = rpcFinished;
} }
@Override
protected void initCache() {
initSyncCache();
}
@Override public Result next() throws IOException {
return nextWithSyncCache();
}
} }
@Test @Test
@ -177,7 +153,7 @@ public class TestClientScanner {
case 1: // detect no more results case 1: // detect no more results
case 2: // close case 2: // close
count++; count++;
return null; return new Result[0];
default: default:
throw new RuntimeException("Expected only 2 invocations"); throw new RuntimeException("Expected only 2 invocations");
} }
@ -197,10 +173,9 @@ public class TestClientScanner {
scanner.loadCache(); scanner.loadCache();
// One for initializeScannerInConstruction()
// One for fetching the results // One for fetching the results
// One for fetching null results and quit as we do not have moreResults hint. // One for fetching empty results and quit as we do not have moreResults hint.
inOrder.verify(caller, Mockito.times(3)).callWithoutRetries( inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt()); Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size()); assertEquals(1, scanner.cache.size());

View File

@ -162,14 +162,13 @@ public class TestMetaTableAccessorNoCluster {
.thenThrow(new ServiceException("Server not running (1 of 3)")) .thenThrow(new ServiceException("Server not running (1 of 3)"))
.thenThrow(new ServiceException("Server not running (2 of 3)")) .thenThrow(new ServiceException("Server not running (2 of 3)"))
.thenThrow(new ServiceException("Server not running (3 of 3)")) .thenThrow(new ServiceException("Server not running (3 of 3)"))
.thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build())
.thenAnswer(new Answer<ScanResponse>() { .thenAnswer(new Answer<ScanResponse>() {
public ScanResponse answer(InvocationOnMock invocation) throws Throwable { public ScanResponse answer(InvocationOnMock invocation) throws Throwable {
((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil
.createCellScanner(cellScannables)); .createCellScanner(cellScannables));
return builder.setScannerId(1234567890L).build(); return builder.setScannerId(1234567890L).setMoreResults(false).build();
} }
}).thenReturn(ScanResponse.newBuilder().setMoreResults(false).build()); });
// Associate a spied-upon Connection with UTIL.getConfiguration. Need // Associate a spied-upon Connection with UTIL.getConfiguration. Need
// to shove this in here first so it gets picked up all over; e.g. by // to shove this in here first so it gets picked up all over; e.g. by
// HTable. // HTable.
@ -198,8 +197,8 @@ public class TestMetaTableAccessorNoCluster {
assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO)); assertTrue(hris.firstEntry().getKey().equals(HRegionInfo.FIRST_META_REGIONINFO));
assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow())); assertTrue(Bytes.equals(rowToVerify, hris.firstEntry().getValue().getRow()));
// Finally verify that scan was called four times -- three times // Finally verify that scan was called four times -- three times
// with exception and then on 4th, 5th and 6th attempt we succeed // with exception and then on 4th attempt we succeed
Mockito.verify(implementation, Mockito.times(6)). Mockito.verify(implementation, Mockito.times(4)).
scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any()); scan((RpcController)Mockito.any(), (ScanRequest)Mockito.any());
} finally { } finally {
if (connection != null && !connection.isClosed()) connection.close(); if (connection != null && !connection.isClosed()) connection.close();

View File

@ -175,7 +175,7 @@ public abstract class AbstractTestAsyncTableScan {
@Test @Test
public void testReversedScanWithStartKeyAndStopKey() throws Exception { public void testReversedScanWithStartKeyAndStopKey() throws Exception {
testReversedScan(998, true, 1, false); // from first region to first region testReversedScan(998, true, 1, false); // from last region to first region
testReversedScan(543, true, 321, true); testReversedScan(543, true, 321, true);
testReversedScan(654, true, 432, false); testReversedScan(654, true, 432, false);
testReversedScan(765, false, 543, true); testReversedScan(765, false, 543, true);

View File

@ -155,6 +155,7 @@ public class TestFromClientSide {
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName()); MultiRowMutationEndpoint.class.getName());
conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
// We need more than one region server in this test // We need more than one region server in this test
TEST_UTIL.startMiniCluster(SLAVES); TEST_UTIL.startMiniCluster(SLAVES);
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,7 +44,6 @@ import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -147,7 +147,7 @@ public class TestScannersFromClientSide {
ht.delete(delete); ht.delete(delete);
// without batch // without batch
scan = new Scan(ROW); scan = new Scan().withStartRow(ROW);
scan.setMaxVersions(); scan.setMaxVersions();
scanner = ht.getScanner(scan); scanner = ht.getScanner(scan);
@ -161,7 +161,7 @@ public class TestScannersFromClientSide {
verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
// with batch // with batch
scan = new Scan(ROW); scan = new Scan().withStartRow(ROW);
scan.setMaxVersions(); scan.setMaxVersions();
scan.setBatch(2); scan.setBatch(2);
scanner = ht.getScanner(scan); scanner = ht.getScanner(scan);
@ -272,7 +272,7 @@ public class TestScannersFromClientSide {
* @param columns * @param columns
* @throws Exception * @throws Exception
*/ */
public void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception {
Scan baseScan = new Scan(); Scan baseScan = new Scan();
baseScan.setReversed(reversed); baseScan.setReversed(reversed);
baseScan.setSmall(true); baseScan.setSmall(true);
@ -299,9 +299,7 @@ public class TestScannersFromClientSide {
Result r = null; Result r = null;
while ((r = scanner.next()) != null) { while ((r = scanner.next()) != null) {
rowCount++; rowCount++;
for (Cell c : r.rawCells()) { cellCount += r.rawCells().length;
cellCount++;
}
} }
assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount, assertTrue("Expected row count: " + expectedRowCount + " Actual row count: " + rowCount,
@ -604,7 +602,7 @@ public class TestScannersFromClientSide {
} }
ht.put(put); ht.put(put);
scan = new Scan(ROW); scan = new Scan().withStartRow(ROW);
scanner = ht.getScanner(scan); scanner = ht.getScanner(scan);
HRegionLocation loc; HRegionLocation loc;
@ -769,5 +767,73 @@ public class TestScannersFromClientSide {
assertEquals(expKvList.size(), result.size()); assertEquals(expKvList.size(), result.size());
} }
private void assertResultEquals(Result result, int i) {
assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, QUALIFIER)));
}
private void testStartRowStopRowInclusive(Table table, int start, boolean startInclusive,
int stop, boolean stopInclusive) throws IOException {
int actualStart = startInclusive ? start : start + 1;
int actualStop = stopInclusive ? stop + 1 : stop;
int expectedCount = actualStop - actualStart;
Result[] results;
try (ResultScanner scanner = table.getScanner(
new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive))) {
results = scanner.next(expectedCount);
}
assertEquals(expectedCount, results.length);
for (int i = 0; i < expectedCount; i++) {
assertResultEquals(results[i], actualStart + i);
}
}
private void testReversedStartRowStopRowInclusive(Table table, int start, boolean startInclusive,
int stop, boolean stopInclusive) throws IOException {
int actualStart = startInclusive ? start : start - 1;
int actualStop = stopInclusive ? stop - 1 : stop;
int expectedCount = actualStart - actualStop;
Result[] results;
try (ResultScanner scanner = table.getScanner(
new Scan().withStartRow(Bytes.toBytes(String.format("%02d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%02d", stop)), stopInclusive)
.setReversed(true))) {
results = scanner.next(expectedCount);
}
assertEquals(expectedCount, results.length);
for (int i = 0; i < expectedCount; i++) {
assertResultEquals(results[i], actualStart - i);
}
}
@Test
public void testStartRowStopRowInclusive() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf("testStartRowStopRowInclusive");
byte[][] splitKeys = new byte[8][];
for (int i = 11; i < 99; i += 11) {
splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i));
}
Table table = TEST_UTIL.createTable(tableName, FAMILY, splitKeys);
TEST_UTIL.waitTableAvailable(tableName);
try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(tableName)) {
for (int i = 0; i < 100; i++) {
mutator.mutate(new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, QUALIFIER,
Bytes.toBytes(i)));
}
}
// from first region to last region
testStartRowStopRowInclusive(table, 1, true, 98, false);
testStartRowStopRowInclusive(table, 12, true, 34, true);
testStartRowStopRowInclusive(table, 23, true, 45, false);
testStartRowStopRowInclusive(table, 34, false, 56, true);
testStartRowStopRowInclusive(table, 45, false, 67, false);
// from last region to first region
testReversedStartRowStopRowInclusive(table, 98, true, 1, false);
testReversedStartRowStopRowInclusive(table, 54, true, 32, true);
testReversedStartRowStopRowInclusive(table, 65, true, 43, false);
testReversedStartRowStopRowInclusive(table, 76, false, 54, true);
testReversedStartRowStopRowInclusive(table, 87, false, 65, false);
}
} }