HBASE-22322 Use special pause for CallQueueTooBigException

This commit is contained in:
Duo Zhang 2019-04-29 10:20:33 +08:00 committed by Apache9
parent 9743b3c70d
commit f9f6354393
17 changed files with 422 additions and 80 deletions

View File

@ -36,16 +36,12 @@ public interface AsyncAdminBuilder {
* Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry * Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry
* times) are both limitations for retrying, we will stop retrying when we reach any of the * times) are both limitations for retrying, we will stop retrying when we reach any of the
* limitations. * limitations.
* @param timeout
* @param unit
* @return this for invocation chaining * @return this for invocation chaining
*/ */
AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit); AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit);
/** /**
* Set timeout for each rpc request. * Set timeout for each rpc request.
* @param timeout
* @param unit
* @return this for invocation chaining * @return this for invocation chaining
*/ */
AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit); AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit);
@ -53,17 +49,27 @@ public interface AsyncAdminBuilder {
/** /**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when * Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying. * retrying.
* @param timeout
* @param unit
* @return this for invocation chaining * @return this for invocation chaining
* @see #setRetryPauseForCQTBE(long, TimeUnit)
*/ */
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit); AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);
/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* @see #setRetryPause(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
/** /**
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1. * Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying, * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations. * we will stop retrying when we reach any of the limitations.
* @param maxRetries
* @return this for invocation chaining * @return this for invocation chaining
*/ */
default AsyncAdminBuilder setMaxRetries(int maxRetries) { default AsyncAdminBuilder setMaxRetries(int maxRetries) {
@ -74,14 +80,12 @@ public interface AsyncAdminBuilder {
* Set the max attempt times for an admin operation. Usually it is the max retry times plus 1. * Set the max attempt times for an admin operation. Usually it is the max retry times plus 1.
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying, * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations. * we will stop retrying when we reach any of the limitations.
* @param maxAttempts
* @return this for invocation chaining * @return this for invocation chaining
*/ */
AsyncAdminBuilder setMaxAttempts(int maxAttempts); AsyncAdminBuilder setMaxAttempts(int maxAttempts);
/** /**
* Set the number of retries that are allowed before we start to log. * Set the number of retries that are allowed before we start to log.
* @param startLogErrorsCnt
* @return this for invocation chaining * @return this for invocation chaining
*/ */
AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt); AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt);

View File

@ -33,6 +33,8 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
protected long pauseNs; protected long pauseNs;
protected long pauseForCQTBENs;
protected int maxAttempts; protected int maxAttempts;
protected int startLogErrorsCnt; protected int startLogErrorsCnt;
@ -41,6 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs(); this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs(); this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.maxAttempts = connConf.getMaxRetries(); this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
} }
@ -63,6 +66,12 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
return this; return this;
} }
@Override
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(timeout);
return this;
}
@Override @Override
public AsyncAdminBuilder setMaxAttempts(int maxAttempts) { public AsyncAdminBuilder setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;

View File

@ -44,10 +44,10 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
private ServerName serverName; private ServerName serverName;
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) { long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
startLogErrorsCnt); rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName; this.serverName = serverName;
this.callable = callable; this.callable = callable;
} }

View File

@ -45,6 +45,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -103,6 +104,8 @@ class AsyncBatchRpcRetryingCaller<T> {
private final long pauseNs; private final long pauseNs;
private final long pauseForCQTBENs;
private final int maxAttempts; private final int maxAttempts;
private final long operationTimeoutNs; private final long operationTimeoutNs;
@ -147,17 +150,17 @@ class AsyncBatchRpcRetryingCaller<T> {
} }
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts, TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.conn = conn; this.conn = conn;
this.tableName = tableName; this.tableName = tableName;
this.pauseNs = pauseNs; this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs; this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt; this.startLogErrorsCnt = startLogErrorsCnt;
this.actions = new ArrayList<>(actions.size()); this.actions = new ArrayList<>(actions.size());
this.futures = new ArrayList<>(actions.size()); this.futures = new ArrayList<>(actions.size());
this.action2Future = new IdentityHashMap<>(actions.size()); this.action2Future = new IdentityHashMap<>(actions.size());
@ -337,7 +340,7 @@ class AsyncBatchRpcRetryingCaller<T> {
} }
}); });
if (!failedActions.isEmpty()) { if (!failedActions.isEmpty()) {
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue()); tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
} }
} }
@ -442,24 +445,27 @@ class AsyncBatchRpcRetryingCaller<T> {
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
.collect(Collectors.toList()); .collect(Collectors.toList());
addError(copiedActions, error, serverName); addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException); tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
error instanceof CallQueueTooBigException);
} }
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately) { private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isCallQueueTooBig) {
if (immediately) { if (immediately) {
groupAndSend(actions, tries); groupAndSend(actions, tries);
return; return;
} }
long delayNs; long delayNs;
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
if (operationTimeoutNs > 0) { if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) { if (maxDelayNs <= 0) {
failAll(actions, tries); failAll(actions, tries);
return; return;
} }
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else { } else {
delayNs = getPauseTime(pauseNs, tries - 1); delayNs = getPauseTime(pauseNsToUse, tries - 1);
} }
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
} }
@ -498,7 +504,7 @@ class AsyncBatchRpcRetryingCaller<T> {
sendOrDelay(actionsByServer, tries); sendOrDelay(actionsByServer, tries);
} }
if (!locateFailed.isEmpty()) { if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries, false); tryResubmit(locateFailed.stream(), tries, false, false);
} }
}); });
} }

View File

@ -73,6 +73,8 @@ class AsyncClientScanner {
private final long pauseNs; private final long pauseNs;
private final long pauseForCQTBENs;
private final int maxAttempts; private final int maxAttempts;
private final long scanTimeoutNs; private final long scanTimeoutNs;
@ -84,8 +86,8 @@ class AsyncClientScanner {
private final ScanResultCache resultCache; private final ScanResultCache resultCache;
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs, AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
long rpcTimeoutNs, int startLogErrorsCnt) { int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) { if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
} }
@ -98,6 +100,7 @@ class AsyncClientScanner {
this.conn = conn; this.conn = conn;
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.pauseNs = pauseNs; this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs; this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
@ -160,14 +163,16 @@ class AsyncClientScanner {
} }
private void startScan(OpenScannerResponse resp) { private void startScan(OpenScannerResponse resp) {
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()) addListener(
.location(resp.loc).remote(resp.isRegionServerRemote) conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) .remote(resp.isRegionServerRemote)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.start(resp.controller, resp.resp), (hasMore, error) -> { .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) { if (error != null) {
consumer.onError(error); consumer.onError(error);
return; return;
@ -185,8 +190,8 @@ class AsyncClientScanner {
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.call(); .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
} }
private long getPrimaryTimeoutNs() { private long getPrimaryTimeoutNs() {

View File

@ -30,6 +30,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
@ -54,6 +55,8 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFE
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Timeout configs. * Timeout configs.
@ -61,6 +64,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
class AsyncConnectionConfiguration { class AsyncConnectionConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
private final long metaOperationTimeoutNs; private final long metaOperationTimeoutNs;
// timeout for a whole operation such as get, put or delete. Notice that scan will not be effected // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
@ -79,6 +84,8 @@ class AsyncConnectionConfiguration {
private final long pauseNs; private final long pauseNs;
private final long pauseForCQTBENs;
private final int maxRetries; private final int maxRetries;
/** How many retries are allowed before we start to log */ /** How many retries are allowed before we start to log */
@ -121,8 +128,16 @@ class AsyncConnectionConfiguration {
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs)); TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
this.writeRpcTimeoutNs = this.writeRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs)); TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
this.pauseNs = long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
if (pauseForCQTBEMs < pauseMs) {
LOG.warn(
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
pauseForCQTBEMs = pauseMs;
}
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt = this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
@ -173,6 +188,10 @@ class AsyncConnectionConfiguration {
return pauseNs; return pauseNs;
} }
long getPauseForCQTBENs() {
return pauseForCQTBENs;
}
int getMaxRetries() { int getMaxRetries() {
return maxRetries; return maxRetries;
} }

View File

@ -44,10 +44,10 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable; private final Callable<T> callable;
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs, Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries,
long rpcTimeoutNs, int startLogErrorsCnt) { long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs,
startLogErrorsCnt); rpcTimeoutNs, startLogErrorsCnt);
this.callable = callable; this.callable = callable;
} }

View File

@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -59,6 +60,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
private final long pauseNs; private final long pauseNs;
private final long pauseForCQTBENs;
private int tries = 1; private int tries = 1;
private final int maxAttempts; private final int maxAttempts;
@ -78,12 +81,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller; protected final HBaseRpcController controller;
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
int startLogErrorsCnt) { long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.conn = conn; this.conn = conn;
this.priority = priority; this.priority = priority;
this.pauseNs = pauseNs; this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs; this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
@ -123,6 +127,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
} }
private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) { private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) {
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
long delayNs; long delayNs;
if (operationTimeoutNs > 0) { if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
@ -130,9 +135,9 @@ public abstract class AsyncRpcRetryingCaller<T> {
completeExceptionally(); completeExceptionally();
return; return;
} }
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else { } else {
delayNs = getPauseTime(pauseNs, tries - 1); delayNs = getPauseTime(pauseNsToUse, tries - 1);
} }
tries++; tries++;
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);

View File

@ -58,6 +58,8 @@ class AsyncRpcRetryingCallerFactory {
protected long pauseNs = conn.connConf.getPauseNs(); protected long pauseNs = conn.connConf.getPauseNs();
protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs();
protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
@ -117,6 +119,11 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public SingleRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
return this;
}
public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) { public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
return this; return this;
@ -149,8 +156,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncSingleRequestRpcRetryingCaller<T> build() { public AsyncSingleRequestRpcRetryingCaller<T> build() {
preCheck(); preCheck();
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId,
locateType, callable, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
startLogErrorsCnt); rpcTimeoutNs, startLogErrorsCnt);
} }
/** /**
@ -256,6 +263,11 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
return this;
}
public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
return this; return this;
@ -280,8 +292,8 @@ class AsyncRpcRetryingCallerFactory {
preCheck(); preCheck();
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs,
startLogErrorsCnt); rpcTimeoutNs, startLogErrorsCnt);
} }
/** /**
@ -335,6 +347,11 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
return this;
}
public BatchCallerBuilder maxAttempts(int maxAttempts) { public BatchCallerBuilder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
return this; return this;
@ -347,7 +364,7 @@ class AsyncRpcRetryingCallerFactory {
public <T> AsyncBatchRpcRetryingCaller<T> build() { public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs,
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
} }
public <T> List<CompletableFuture<T>> call() { public <T> List<CompletableFuture<T>> call() {
@ -389,6 +406,11 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public MasterRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
return this;
}
public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) { public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
return this; return this;
@ -416,7 +438,7 @@ class AsyncRpcRetryingCallerFactory {
public AsyncMasterRequestRpcRetryingCaller<T> build() { public AsyncMasterRequestRpcRetryingCaller<T> build() {
preCheck(); preCheck();
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority, return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn, callable, priority,
pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
} }
/** /**
@ -465,6 +487,11 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public AdminRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
return this;
}
public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) { public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
return this; return this;
@ -487,7 +514,7 @@ class AsyncRpcRetryingCallerFactory {
public AsyncAdminRequestRetryingCaller<T> build() { public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs, return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn, priority, pauseNs,
maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
} }
@ -531,6 +558,11 @@ class AsyncRpcRetryingCallerFactory {
return this; return this;
} }
public ServerRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
return this;
}
public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) { public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
return this; return this;
@ -547,8 +579,8 @@ class AsyncRpcRetryingCallerFactory {
} }
public AsyncServerRequestRpcRetryingCaller<T> build() { public AsyncServerRequestRpcRetryingCaller<T> build() {
return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, maxAttempts, return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn, pauseNs, pauseForCQTBENs,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null"));
} }

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
@ -97,6 +98,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final long pauseNs; private final long pauseNs;
private final long pauseForCQTBENs;
private final int maxAttempts; private final int maxAttempts;
private final long scanTimeoutNs; private final long scanTimeoutNs;
@ -304,7 +307,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.scan = scan; this.scan = scan;
this.scanMetrics = scanMetrics; this.scanMetrics = scanMetrics;
@ -316,6 +320,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.regionServerRemote = isRegionServerRemote; this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs; this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs; this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs;
@ -405,15 +410,16 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return; return;
} }
long delayNs; long delayNs;
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
if (scanTimeoutNs > 0) { if (scanTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) { if (maxDelayNs <= 0) {
completeExceptionally(!scannerClosed); completeExceptionally(!scannerClosed);
return; return;
} }
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else { } else {
delayNs = getPauseTime(pauseNs, tries - 1); delayNs = getPauseTime(pauseNsToUse, tries - 1);
} }
if (scannerClosed) { if (scannerClosed) {
completeWhenError(false); completeWhenError(false);

View File

@ -46,10 +46,10 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private ServerName serverName; private ServerName serverName;
public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) { long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, operationTimeoutNs, super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, maxAttempts,
rpcTimeoutNs, startLogErrorsCnt); operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName; this.serverName = serverName;
this.callable = callable; this.callable = callable;
} }

View File

@ -56,10 +56,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
Callable<T> callable, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs, Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts,
long rpcTimeoutNs, int startLogErrorsCnt) { long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
startLogErrorsCnt); rpcTimeoutNs, startLogErrorsCnt);
this.tableName = tableName; this.tableName = tableName;
this.row = row; this.row = row;
this.replicaId = replicaId; this.replicaId = replicaId;

View File

@ -76,9 +76,22 @@ public interface AsyncTableBuilder<C extends ScanResultConsumerBase> {
/** /**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when * Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying. * retrying.
* @see #setRetryPauseForCQTBE(long, TimeUnit)
*/ */
AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit); AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit);
/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* @see #setRetryPause(long, TimeUnit)
*/
AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit);
/** /**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1. * Set the max retry times for an operation. Usually it is the max attempt times minus 1.
* <p> * <p>

View File

@ -45,6 +45,8 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
protected long pauseNs; protected long pauseNs;
protected long pauseForCQTBENs;
protected int maxAttempts; protected int maxAttempts;
protected int startLogErrorsCnt; protected int startLogErrorsCnt;
@ -58,6 +60,7 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs(); this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.maxAttempts = retries2Attempts(connConf.getMaxRetries()); this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
} }
@ -98,6 +101,12 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
return this; return this;
} }
@Override
public AsyncTableBuilderBase<C> setRetryPauseForCQTBE(long pause, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(pause);
return this;
}
@Override @Override
public AsyncTableBuilderBase<C> setMaxAttempts(int maxAttempts) { public AsyncTableBuilderBase<C> setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts; this.maxAttempts = maxAttempts;

View File

@ -327,6 +327,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private final long pauseNs; private final long pauseNs;
private final long pauseForCQTBENs;
private final int maxAttempts; private final int maxAttempts;
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
@ -341,6 +343,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
this.rpcTimeoutNs = builder.rpcTimeoutNs; this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.operationTimeoutNs = builder.operationTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs;
this.pauseNs = builder.pauseNs; this.pauseNs = builder.pauseNs;
if (builder.pauseForCQTBENs < builder.pauseNs) {
LOG.warn(
"Configured value of pauseForCQTBENs is {} ms, which is less than" +
" the normal pause value {} ms, use the greater one instead",
TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
this.pauseForCQTBENs = builder.pauseNs;
} else {
this.pauseForCQTBENs = builder.pauseForCQTBENs;
}
this.maxAttempts = builder.maxAttempts; this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt; this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.ng = connection.getNonceGenerator(); this.ng = connection.getNonceGenerator();
@ -348,18 +360,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <T> MasterRequestCallerBuilder<T> newMasterCaller() { private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
return this.connection.callerFactory.<T> masterRequest() return this.connection.callerFactory.<T> masterRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.startLogErrorsCnt(startLogErrorsCnt); .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
} }
private <T> AdminRequestCallerBuilder<T> newAdminCaller() { private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
return this.connection.callerFactory.<T> adminRequest() return this.connection.callerFactory.<T> adminRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.startLogErrorsCnt(startLogErrorsCnt); .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
} }
@FunctionalInterface @FunctionalInterface
@ -3357,10 +3369,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <T> ServerRequestCallerBuilder<T> newServerCaller() { private <T> ServerRequestCallerBuilder<T> newServerCaller() {
return this.connection.callerFactory.<T> serverRequest() return this.connection.callerFactory.<T> serverRequest()
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.startLogErrorsCnt(startLogErrorsCnt); .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
} }
@Override @Override

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
@ -77,6 +79,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private @InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
private final AsyncConnectionImpl conn; private final AsyncConnectionImpl conn;
private final Timer retryTimer; private final Timer retryTimer;
@ -99,6 +103,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private final long pauseNs; private final long pauseNs;
private final long pauseForCQTBENs;
private final int maxAttempts; private final int maxAttempts;
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
@ -113,6 +119,16 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
this.operationTimeoutNs = builder.operationTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs;
this.scanTimeoutNs = builder.scanTimeoutNs; this.scanTimeoutNs = builder.scanTimeoutNs;
this.pauseNs = builder.pauseNs; this.pauseNs = builder.pauseNs;
if (builder.pauseForCQTBENs < builder.pauseNs) {
LOG.warn(
"Configured value of pauseForCQTBENs is {} ms, which is less than" +
" the normal pause value {} ms, use the greater one instead",
TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
this.pauseForCQTBENs = builder.pauseNs;
} else {
this.pauseForCQTBENs = builder.pauseForCQTBENs;
}
this.maxAttempts = builder.maxAttempts; this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt; this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
@ -220,8 +236,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.startLogErrorsCnt(startLogErrorsCnt); .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
} }
private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller( private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> newCaller(
@ -451,7 +467,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
@Override @Override
public void scan(Scan scan, AdvancedScanResultConsumer consumer) { public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt)
.start();
} }
private long resultSize2CacheSize(long maxResultSize) { private long resultSize2CacheSize(long maxResultSize) {
@ -521,7 +538,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return conn.callerFactory.batch().table(tableName).actions(actions) return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).call();
} }
@Override @Override

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncClientPauseForCallQueueTooBig {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("CQTBE");
private static byte[] FAMILY = Bytes.toBytes("Family");
private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1);
private static AsyncConnection CONN;
private static boolean FAIL = false;
private static ConcurrentMap<MethodDescriptor, AtomicInteger> INVOKED = new ConcurrentHashMap<>();
public static final class CQTBERpcScheduler extends SimpleRpcScheduler {
public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
Abortable server, int highPriorityLevel) {
super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
metaTransitionHandler, priority, server, highPriorityLevel);
}
@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
if (FAIL) {
MethodDescriptor method = callTask.getRpcCall().getMethod();
// this is for test scan, where we will send a open scanner first and then a next, and we
// expect that we hit CQTBE two times.
if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) {
return false;
}
}
return super.dispatch(callTask);
}
}
public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory {
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new CQTBERpcScheduler(conf, handlerCount,
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
priority, server, HConstants.QOS_THRESHOLD);
}
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS));
UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class);
UTIL.startMiniCluster(1);
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
}
@AfterClass
public static void tearDown() throws Exception {
Closeables.close(CONN, true);
UTIL.shutdownMiniCluster();
}
@Before
public void setUpBeforeTest() throws IOException {
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
}
}
FAIL = true;
}
@After
public void tearDownAfterTest() throws IOException {
FAIL = false;
INVOKED.clear();
UTIL.getAdmin().disableTable(TABLE_NAME);
UTIL.getAdmin().deleteTable(TABLE_NAME);
}
private void assertTime(Callable<Void> callable, long time) throws Exception {
long startNs = System.nanoTime();
callable.call();
long costNs = System.nanoTime() - startNs;
assertTrue(costNs > time);
}
@Test
public void testGet() throws Exception {
assertTime(() -> {
Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get();
assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
return null;
}, PAUSE_FOR_CQTBE_NS);
}
@Test
public void testBatch() throws Exception {
assertTime(() -> {
List<CompletableFuture<?>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) {
for (int i = 100; i < 110; i++) {
futures.add(mutator
.mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
}
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
}, PAUSE_FOR_CQTBE_NS);
}
@Test
public void testScan() throws Exception {
// we will hit CallQueueTooBigException two times so the sleep time should be twice
assertTime(() -> {
try (
ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) {
for (int i = 0; i < 100; i++) {
Result result = scanner.next();
assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
}
assertNull(scanner.next());
}
return null;
}, PAUSE_FOR_CQTBE_NS * 2);
}
}