HBASE-17608 Add suspend support for RawScanResultConsumer

This commit is contained in:
zhangduo 2017-02-22 15:26:10 +08:00
parent 93e60153b0
commit f037f230fd
13 changed files with 608 additions and 146 deletions

View File

@ -130,11 +130,12 @@ class AsyncClientScanner {
private void startScan(OpenScannerResponse resp) {
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.stub(resp.stub).setScan(scan).consumer(consumer).resultCache(resultCache)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp)
.whenComplete((hasMore, error) -> {
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp).whenComplete((hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;

View File

@ -156,6 +156,8 @@ class AsyncRpcRetryingCallerFactory {
private HRegionLocation loc;
private long scannerLeaseTimeoutPeriodNs;
private long scanTimeoutNs;
private long rpcTimeoutNs;
@ -190,6 +192,12 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
public ScanSingleRegionCallerBuilder scannerLeaseTimeoutPeriod(long scannerLeaseTimeoutPeriod,
TimeUnit unit) {
this.scannerLeaseTimeoutPeriodNs = unit.toNanos(scannerLeaseTimeoutPeriod);
return this;
}
public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) {
this.scanTimeoutNs = unit.toNanos(scanTimeout);
return this;
@ -221,8 +229,8 @@ class AsyncRpcRetryingCallerFactory {
checkNotNull(scan, "scan is null"), scannerId,
checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
checkNotNull(loc, "location is null"), scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts,
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@ -307,7 +315,8 @@ class AsyncRpcRetryingCallerFactory {
private long rpcTimeoutNs = -1L;
public MasterRequestCallerBuilder<T> action(AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
public MasterRequestCallerBuilder<T> action(
AsyncMasterRequestRpcRetryingCaller.Callable<T> callable) {
this.callable = callable;
return this;
}
@ -338,9 +347,9 @@ class AsyncRpcRetryingCallerFactory {
}
public AsyncMasterRequestRpcRetryingCaller<T> build() {
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, checkNotNull(callable,
"action is null"), pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
}
/**

View File

@ -17,13 +17,17 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import com.google.common.base.Preconditions;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import java.io.IOException;
import java.util.ArrayList;
@ -39,6 +43,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.RawScanResultConsumer.ScanResumer;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@ -76,6 +81,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final HRegionLocation loc;
private final long scannerLeaseTimeoutPeriodNs;
private final long pauseNs;
private final int maxAttempts;
@ -104,10 +111,176 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private long nextCallSeq = -1L;
private enum ScanControllerState {
INITIALIZED, SUSPENDED, TERMINATED, DESTROYED
}
// Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
// of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
// usage. We use two things to prevent invalid usage:
// 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
// IllegalStateException if the caller thread is not this thread.
// 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
// be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
// TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
// call destroy to get the current state and set the state to DESTROYED. And when user calls
// suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
// an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
// suspend or terminate so the state will still be INITIALIZED when back from onNext or
// onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
// to be used in the future.
// Notice that, the public methods of this class is supposed to be called by upper layer only, and
// package private methods can only be called within the implementation of
// AsyncScanSingleRegionRpcRetryingCaller.
private final class ScanControllerImpl implements RawScanResultConsumer.ScanController {
// Make sure the methods are only called in this thread.
private final Thread callerThread = Thread.currentThread();
// INITIALIZED -> SUSPENDED -> DESTROYED
// INITIALIZED -> TERMINATED -> DESTROYED
// INITIALIZED -> DESTROYED
// If the state is incorrect we will throw IllegalStateException.
private ScanControllerState state = ScanControllerState.INITIALIZED;
private ScanResumerImpl resumer;
private void preCheck() {
Preconditions.checkState(Thread.currentThread() == callerThread,
"The current thread is %s, expected thread is %s, " +
"you should not call this method outside onNext or onHeartbeat",
Thread.currentThread(), callerThread);
Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
"Invalid Stopper state %s", state);
}
@Override
public ScanResumer suspend() {
preCheck();
state = ScanControllerState.SUSPENDED;
ScanResumerImpl resumer = new ScanResumerImpl();
this.resumer = resumer;
return resumer;
}
@Override
public void terminate() {
preCheck();
state = ScanControllerState.TERMINATED;
}
// return the current state, and set the state to DESTROYED.
ScanControllerState destroy() {
ScanControllerState state = this.state;
this.state = ScanControllerState.DESTROYED;
return state;
}
}
private enum ScanResumerState {
INITIALIZED, SUSPENDED, RESUMED
}
// The resume method is allowed to be called in another thread so here we also use the
// ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
// from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
// and when user calls resume method, we will change the state to RESUMED. But the resume method
// could be called in other thread, and in fact, user could just do this:
// controller.suspend().resume()
// This is strange but valid. This means the scan could be resumed before we call the prepare
// method to do the actual suspend work. So in the resume method, we will check if the state is
// INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
// method, if the state is RESUMED already, we will just return an let the scan go on.
// Notice that, the public methods of this class is supposed to be called by upper layer only, and
// package private methods can only be called within the implementation of
// AsyncScanSingleRegionRpcRetryingCaller.
private final class ScanResumerImpl implements RawScanResultConsumer.ScanResumer {
// INITIALIZED -> SUSPENDED -> RESUMED
// INITIALIZED -> RESUMED
private ScanResumerState state = ScanResumerState.INITIALIZED;
private ScanResponse resp;
private int numValidResults;
// If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
// by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
// every time when the previous task is finished. There could also be race as the renewal is
// executed in the timer thread, so we also need to check the state before lease renewal. If the
// state is RESUMED already, we will give up lease renewal and also not schedule the next lease
// renewal task.
private Timeout leaseRenewer;
@Override
public void resume() {
// just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
// just return at the first if condition without loading the resp and numValidResuls field. If
// resume is called after suspend, then it is also safe to just reference resp and
// numValidResults after the synchronized block as no one will change it anymore.
ScanResponse localResp;
int localNumValidResults;
synchronized (this) {
if (state == ScanResumerState.INITIALIZED) {
// user calls this method before we call prepare, so just set the state to
// RESUMED, the implementation will just go on.
state = ScanResumerState.RESUMED;
return;
}
if (state == ScanResumerState.RESUMED) {
// already resumed, give up.
return;
}
state = ScanResumerState.RESUMED;
if (leaseRenewer != null) {
leaseRenewer.cancel();
}
localResp = this.resp;
localNumValidResults = this.numValidResults;
}
completeOrNext(localResp, localNumValidResults);
}
private void scheduleRenewLeaseTask() {
leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
TimeUnit.NANOSECONDS);
}
private synchronized void tryRenewLease() {
// the scan has already been resumed, give up
if (state == ScanResumerState.RESUMED) {
return;
}
renewLease();
// schedule the next renew lease task again as this is a one-time task.
scheduleRenewLeaseTask();
}
// return false if the scan has already been resumed. See the comment above for ScanResumerImpl
// for more details.
synchronized boolean prepare(ScanResponse resp, int numValidResults) {
if (state == ScanResumerState.RESUMED) {
// user calls resume before we actually suspend the scan, just continue;
return false;
}
state = ScanResumerState.SUSPENDED;
this.resp = resp;
this.numValidResults = numValidResults;
// if there are no more results in region then the scanner at RS side will be closed
// automatically so we do not need to renew lease.
if (resp.getMoreResultsInRegion()) {
// schedule renew lease task
scheduleRenewLeaseTask();
}
return true;
}
}
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
RawScanResultConsumer consumer, Interface stub, HRegionLocation loc,
long scannerLeaseTimeoutPeriodNs, long pauseNs, int maxAttempts, long scanTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
this.scannerId = scannerId;
@ -115,6 +288,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.consumer = consumer;
this.stub = stub;
this.loc = loc;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
@ -143,9 +317,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
+ " for " + loc.getRegionInfo().getEncodedName() + " of "
+ loc.getRegionInfo().getTable() + " failed, ignore, probably already closed",
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
" for " + loc.getRegionInfo().getEncodedName() + " of " +
loc.getRegionInfo().getTable() + " failed, ignore, probably already closed",
controller.getFailed());
}
});
@ -182,16 +356,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void onError(Throwable error) {
error = translateException(error);
if (tries > startLogErrorsCnt) {
LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
+ loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable()
+ " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
+ " ms",
LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() +
" failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
" ms",
error);
}
boolean scannerClosed =
error instanceof UnknownScannerException || error instanceof NotServingRegionException
|| error instanceof RegionServerStoppedException;
boolean scannerClosed = error instanceof UnknownScannerException ||
error instanceof NotServingRegionException || error instanceof RegionServerStoppedException;
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
@ -229,7 +402,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private void updateNextStartRowWhenError(Result result) {
nextStartRowWhenError = result.getRow();
includeNextStartRowWhenError = scan.getBatch() > 0 || result.isPartial();
includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
}
private void completeWhenNoMoreResultsInRegion() {
@ -248,6 +421,27 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
}
private void completeOrNext(ScanResponse resp, int numValidResults) {
if (resp.hasMoreResults() && !resp.getMoreResults()) {
// RS tells us there is no more data for the whole scan
completeNoMoreResults();
return;
}
if (scan.getLimit() > 0) {
// The RS should have set the moreResults field in ScanResponse to false when we have reached
// the limit.
int limit = scan.getLimit() - numValidResults;
assert limit > 0;
scan.setLimit(limit);
}
// as in 2.0 this value will always be set
if (!resp.getMoreResultsInRegion()) {
completeWhenNoMoreResultsInRegion.run();
return;
}
next();
}
private void onComplete(HBaseRpcController controller, ScanResponse resp) {
if (controller.failed()) {
onError(controller.getFailed());
@ -269,20 +463,16 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return;
}
boolean stopByUser;
ScanControllerImpl scanController = new ScanControllerImpl();
if (results.length == 0) {
// if we have nothing to return then this must be a heartbeat message.
stopByUser = !consumer.onHeartbeat();
consumer.onHeartbeat(scanController);
} else {
updateNextStartRowWhenError(results[results.length - 1]);
stopByUser = !consumer.onNext(results);
consumer.onNext(results, scanController);
}
if (resp.hasMoreResults() && !resp.getMoreResults()) {
// RS tells us there is no more data for the whole scan
completeNoMoreResults();
return;
}
if (stopByUser) {
ScanControllerState state = scanController.destroy();
if (state == ScanControllerState.TERMINATED) {
if (resp.getMoreResultsInRegion()) {
// we have more results in region but user request to stop the scan, so we need to close the
// scanner explicitly.
@ -291,19 +481,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
completeNoMoreResults();
return;
}
if (scan.getLimit() > 0) {
// The RS should have set the moreResults field in ScanResponse to false when we have reached
// the limit.
int limit = scan.getLimit() - results.length;
assert limit > 0;
scan.setLimit(limit);
if (state == ScanControllerState.SUSPENDED) {
if (scanController.resumer.prepare(resp, results.length)) {
return;
}
}
// as in 2.0 this value will always be set
if (!resp.getMoreResultsInRegion()) {
completeWhenNoMoreResultsInRegion.run();
return;
}
next();
completeOrNext(resp, results.length);
}
private void call() {
@ -337,6 +520,15 @@ class AsyncScanSingleRegionRpcRetryingCaller {
call();
}
private void renewLease() {
nextCallSeq++;
resetController(controller, rpcTimeoutNs);
ScanRequest req =
RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
stub.scan(controller, req, resp -> {
});
}
/**
* Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
* need to process the ScanResponse for the open scanner request. The HBaseRpcController for the

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import java.io.IOException;
@ -29,9 +29,7 @@ import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
/**
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
@ -45,8 +43,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private final RawAsyncTable rawTable;
private final Scan scan;
private final long maxCacheSize;
private final Queue<Result> queue = new ArrayDeque<>();
@ -57,16 +53,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private Throwable error;
private boolean prefetchStopped;
private int numberOfOnCompleteToIgnore;
// used to filter out cells that already returned when we restart a scan
private Cell lastCell;
private ScanResumer resumer;
public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
this.rawTable = table;
this.scan = scan;
this.maxCacheSize = maxCacheSize;
table.scan(scan, this);
}
@ -76,71 +66,36 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
cacheSize += calcEstimatedSize(result);
}
private void stopPrefetch(Result lastResult) {
prefetchStopped = true;
if (lastResult.isPartial() || scan.getBatch() > 0) {
scan.withStartRow(lastResult.getRow());
lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1];
} else {
scan.withStartRow(lastResult.getRow(), false);
}
private void stopPrefetch(ScanController controller) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format("0x%x", System.identityHashCode(this)) + " stop prefetching when scanning "
+ rawTable.getName() + " as the cache size " + cacheSize
+ " is greater than the maxCacheSize " + maxCacheSize + ", the next start row is "
+ Bytes.toStringBinary(scan.getStartRow()) + ", lastCell is " + lastCell);
LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
" stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
}
// Ignore an onComplete call as the scan is stopped by us.
// Here we can not use a simple boolean flag. A scan operation can cross multiple regions and
// the regions may be located on different regionservers, so it is possible that the methods of
// RawScanResultConsumer are called in different rpc framework threads and overlapped with each
// other. It may happen that
// 1. we stop scan1
// 2. we start scan2
// 3. we stop scan2
// 4. onComplete for scan1 is called
// 5. onComplete for scan2 is called
// So if we use a boolean flag here then we can only ignore the onComplete in step4 and think
// that the onComplete in step 5 tells us there is no data.
numberOfOnCompleteToIgnore++;
resumer = controller.suspend();
}
@Override
public synchronized boolean onNext(Result[] results) {
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;
if (closed) {
return false;
controller.terminate();
return;
}
Result firstResult = results[0];
if (lastCell != null) {
firstResult = filterCells(firstResult, lastCell);
if (firstResult != null) {
// do not set lastCell to null if the result after filtering is null as there may still be
// other cells that can be filtered out
lastCell = null;
addToCache(firstResult);
} else if (results.length == 1) {
// the only one result is null
return true;
}
} else {
addToCache(firstResult);
}
for (int i = 1; i < results.length; i++) {
addToCache(results[i]);
for (Result result : results) {
addToCache(result);
}
notifyAll();
if (cacheSize < maxCacheSize) {
return true;
if (cacheSize >= maxCacheSize) {
stopPrefetch(controller);
}
stopPrefetch(results[results.length - 1]);
return false;
}
@Override
public synchronized boolean onHeartbeat() {
return !closed;
public synchronized void onHeartbeat(ScanController controller) {
if (closed) {
controller.terminate();
}
}
@Override
@ -150,12 +105,6 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
@Override
public synchronized void onComplete() {
// Do not mark the scanner as closed if the scan is stopped by us due to cache size limit since
// we may resume later by starting a new scan. See resumePrefetch.
if (numberOfOnCompleteToIgnore > 0) {
numberOfOnCompleteToIgnore--;
return;
}
closed = true;
notifyAll();
}
@ -164,8 +113,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
}
prefetchStopped = false;
rawTable.scan(scan, this);
resumer.resume();
resumer = null;
}
@Override
@ -186,7 +135,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
}
Result result = queue.poll();
cacheSize -= calcEstimatedSize(result);
if (prefetchStopped && cacheSize <= maxCacheSize / 2) {
if (resumer != null && cacheSize <= maxCacheSize / 2) {
resumePrefetch();
}
return result;
@ -197,13 +146,22 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
closed = true;
queue.clear();
cacheSize = 0;
if (resumer != null) {
resumePrefetch();
}
notifyAll();
}
@Override
public boolean renewLease() {
// we will do prefetching in the background and if there is no space we will just terminate the
// background scan operation. So there is no reason to renew lease here.
// we will do prefetching in the background and if there is no space we will just suspend the
// scanner. The renew lease operation will be handled in the background.
return false;
}
// used in tests to test whether the scanner has been suspended
@VisibleForTesting
synchronized boolean isSuspended() {
return resumer != null;
}
}

View File

@ -52,13 +52,13 @@ public interface RawAsyncTable extends AsyncTableBase {
/**
* The basic scan API uses the observer pattern. All results that match the given scan object will
* be passed to the given {@code consumer} by calling
* {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()}
* means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit
* an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()}
* means the RS is still working but we can not get a valid result to call
* {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are
* too sparse, for example, a filter which almost filters out everything is specified.
* be passed to the given {@code consumer} by calling {@code RawScanResultConsumer.onNext}.
* {@code RawScanResultConsumer.onComplete} means the scan is finished, and
* {@code RawScanResultConsumer.onError} means we hit an unrecoverable error and the scan is
* terminated. {@code RawScanResultConsumer.onHeartbeat} means the RS is still working but we can
* not get a valid result to call {@code RawScanResultConsumer.onNext}. This is usually because
* the matched results are too sparse, for example, a filter which almost filters out everything
* is specified.
* <p>
* Notice that, the methods of the given {@code consumer} will be called directly in the rpc
* framework's callback thread, so typically you should not do any time consuming work inside

View File

@ -358,9 +358,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
scan(scan, new RawScanResultConsumer() {
@Override
public boolean onNext(Result[] results) {
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
return true;
}
@Override

View File

@ -24,10 +24,10 @@ import org.apache.hadoop.hbase.client.Result;
/**
* Receives {@link Result} for an asynchronous scan.
* <p>
* Notice that, the {@link #onNext(Result[])} method will be called in the thread which we send
* request to HBase service. So if you want the asynchronous scanner fetch data from HBase in
* background while you process the returned data, you need to move the processing work to another
* thread to make the {@code onNext} call return immediately. And please do NOT do any time
* Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
* which we send request to HBase service. So if you want the asynchronous scanner fetch data from
* HBase in background while you process the returned data, you need to move the processing work to
* another thread to make the {@code onNext} call return immediately. And please do NOT do any time
* consuming tasks in all methods below unless you know what you are doing.
*/
@InterfaceAudience.Public
@ -35,20 +35,70 @@ import org.apache.hadoop.hbase.client.Result;
public interface RawScanResultConsumer {
/**
* @param results the data fetched from HBase service.
* @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
* Used to resume a scan.
*/
boolean onNext(Result[] results);
@InterfaceAudience.Public
@InterfaceStability.Unstable
interface ScanResumer {
/**
* Resume the scan. You are free to call it multiple time but only the first call will take
* effect.
*/
void resume();
}
/**
* Used to suspend or stop a scan.
* <p>
* Notice that, you should only call the methods below inside onNext or onHeartbeat method. A
* IllegalStateException will be thrown if you call them at other places.
* <p>
* You can only call one of the methods below, i.e., call suspend or terminate(of course you are
* free to not call them both), and the methods are not reentrant. A IllegalStateException will be
* thrown if you have already called one of the methods.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
interface ScanController {
/**
* Suspend the scan.
* <p>
* This means we will stop fetching data in background, i.e., will not call onNext any more
* before you resume the scan.
* @return A resumer used to resume the scan later.
*/
ScanResumer suspend();
/**
* Terminate the scan.
* <p>
* This is useful when you have got enough results and want to stop the scan in onNext method,
* or you want to stop the scan in onHeartbeat method because it has spent too many time.
*/
void terminate();
}
/**
* Indicate that we have receive some data.
* @param results the data fetched from HBase service.
* @param controller used to suspend or terminate the scan. Notice that the {@code controller}
* instance is only valid within scope of onNext method. You can only call its method in
* onNext, do NOT store it and call it later outside onNext.
*/
void onNext(Result[] results, ScanController controller);
/**
* Indicate that there is an heartbeat message but we have not cumulated enough cells to call
* onNext.
* <p>
* This method give you a chance to terminate a slow scan operation.
* @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
* @param controller used to suspend or terminate the scan. Notice that the {@code controller}
* instance is only valid within the scope of onHeartbeat method. You can only call its
* method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
*/
default boolean onHeartbeat() {
return true;
default void onHeartbeat(ScanController controller) {
}
/**

View File

@ -147,7 +147,7 @@ public class Threads {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
LOG.warn("sleep interrupted", e);
Thread.currentThread().interrupt();
}
}

View File

@ -407,7 +407,7 @@ public class AsyncAggregationClient {
private R value = null;
@Override
public boolean onNext(Result[] results) {
public void onNext(Result[] results, ScanController controller) {
try {
for (Result result : results) {
Cell weightCell = result.getColumnLatestCell(family, weightQualifier);
@ -419,15 +419,15 @@ public class AsyncAggregationClient {
} else {
future.completeExceptionally(new NoSuchElementException());
}
return false;
controller.terminate();
return;
}
Cell valueCell = result.getColumnLatestCell(family, valueQualifier);
value = ci.getValue(family, valueQualifier, valueCell);
}
return true;
} catch (IOException e) {
future.completeExceptionally(e);
return false;
controller.terminate();
}
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableScanRenewLease {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
private static AsyncConnection CONN;
private static RawAsyncTable TABLE;
private static int SCANNER_LEASE_TIMEOUT_PERIOD_MS = 5000;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
SCANNER_LEASE_TIMEOUT_PERIOD_MS);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
TABLE = CONN.getRawTable(TABLE_NAME);
TABLE.putAll(IntStream.range(0, 10).mapToObj(
i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
.collect(Collectors.toList())).get();
}
@AfterClass
public static void tearDown() throws Exception {
CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
private static final class RenewLeaseConsumer implements RawScanResultConsumer {
private final List<Result> results = new ArrayList<>();
private Throwable error;
private boolean finished = false;
private boolean suspended = false;
@Override
public synchronized void onNext(Result[] results, ScanController controller) {
for (Result result : results) {
this.results.add(result);
}
if (!suspended) {
ScanResumer resumer = controller.suspend();
new Thread(() -> {
Threads.sleep(2 * SCANNER_LEASE_TIMEOUT_PERIOD_MS);
try {
TABLE.put(new Put(Bytes.toBytes(String.format("%02d", 10))).addColumn(FAMILY, CQ,
Bytes.toBytes(10))).get();
} catch (Exception e) {
onError(e);
}
resumer.resume();
}).start();
}
}
@Override
public synchronized void onError(Throwable error) {
this.finished = true;
this.error = error;
notifyAll();
}
@Override
public synchronized void onComplete() {
this.finished = true;
notifyAll();
}
public synchronized List<Result> get() throws Throwable {
while (!finished) {
wait();
}
if (error != null) {
throw error;
}
return results;
}
}
@Test
public void test() throws Throwable {
RenewLeaseConsumer consumer = new RenewLeaseConsumer();
TABLE.scan(new Scan(), consumer);
List<Result> results = consumer.get();
// should not see the newly added value
assertEquals(10, results.size());
IntStream.range(0, 10).forEach(i -> {
Result result = results.get(i);
assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
});
// now we can see the newly added value
List<Result> results2 = TABLE.scanAll(new Scan()).get();
assertEquals(11, results2.size());
IntStream.range(0, 11).forEach(i -> {
Result result = results2.get(i);
assertEquals(String.format("%02d", i), Bytes.toString(result.getRow()));
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
});
}
}

View File

@ -82,5 +82,4 @@ public class TestAsyncTableScanner extends AbstractTestAsyncTableScan {
}
return results;
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableScannerCloseWhileSuspending {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
private static AsyncConnection CONN;
private static AsyncTable TABLE;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
TABLE.putAll(IntStream.range(0, 100).mapToObj(
i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
.collect(Collectors.toList())).get();
}
@AfterClass
public static void tearDown() throws Exception {
CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
private int getScannersCount() {
return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount())
.sum();
}
@Test
public void testCloseScannerWhileSuspending() throws Exception {
try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) {
TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ((AsyncTableResultScanner) scanner).isSuspended();
}
@Override
public String explainFailure() throws Exception {
return "The given scanner has been suspended in time";
}
});
assertEquals(1, getScannersCount());
}
TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getScannersCount() == 0;
}
@Override
public String explainFailure() throws Exception {
return "Still have " + getScannersCount() + " scanners opened";
}
});
}
}

View File

@ -48,12 +48,11 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
private Throwable error;
@Override
public synchronized boolean onNext(Result[] results) {
public synchronized void onNext(Result[] results, ScanController controller) {
for (Result result : results) {
queue.offer(result);
}
notifyAll();
return true;
}
@Override