From f9f63543933616ca887fa2dc954dfd7e649d0461 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 29 Apr 2019 10:20:33 +0800 Subject: [PATCH] HBASE-22322 Use special pause for CallQueueTooBigException --- .../hbase/client/AsyncAdminBuilder.java | 22 +- .../hbase/client/AsyncAdminBuilderBase.java | 9 + .../AsyncAdminRequestRetryingCaller.java | 8 +- .../client/AsyncBatchRpcRetryingCaller.java | 24 ++- .../hbase/client/AsyncClientScanner.java | 29 +-- .../client/AsyncConnectionConfiguration.java | 23 +- .../AsyncMasterRequestRpcRetryingCaller.java | 8 +- .../hbase/client/AsyncRpcRetryingCaller.java | 13 +- .../client/AsyncRpcRetryingCallerFactory.java | 50 ++++- ...syncScanSingleRegionRpcRetryingCaller.java | 12 +- .../AsyncServerRequestRpcRetryingCaller.java | 8 +- .../AsyncSingleRequestRpcRetryingCaller.java | 8 +- .../hbase/client/AsyncTableBuilder.java | 13 ++ .../hbase/client/AsyncTableBuilderBase.java | 9 + .../hbase/client/RawAsyncHBaseAdmin.java | 36 ++-- .../hbase/client/RawAsyncTableImpl.java | 26 ++- ...estAsyncClientPauseForCallQueueTooBig.java | 204 ++++++++++++++++++ 17 files changed, 422 insertions(+), 80 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java index 6a8db9e9ec9..49bc350bb9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java @@ -36,16 +36,12 @@ public interface AsyncAdminBuilder { * Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry * times) are both limitations for retrying, we will stop retrying when we reach any of the * limitations. - * @param timeout - * @param unit * @return this for invocation chaining */ AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit); /** * Set timeout for each rpc request. - * @param timeout - * @param unit * @return this for invocation chaining */ AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit); @@ -53,17 +49,27 @@ public interface AsyncAdminBuilder { /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. - * @param timeout - * @param unit * @return this for invocation chaining + * @see #setRetryPauseForCQTBE(long, TimeUnit) */ AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit); + /** + * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an + * exponential policy to generate sleep time when retrying. + *

+ * This value should be greater than the normal pause value which could be set with the above + * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} + * means the server is overloaded. We just use the normal pause value for + * {@code CallQueueTooBigException} if here you specify a smaller value. + * @see #setRetryPause(long, TimeUnit) + */ + AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + /** * Set the max retry times for an admin operation. Usually it is the max attempt times minus 1. * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, * we will stop retrying when we reach any of the limitations. - * @param maxRetries * @return this for invocation chaining */ default AsyncAdminBuilder setMaxRetries(int maxRetries) { @@ -74,14 +80,12 @@ public interface AsyncAdminBuilder { * Set the max attempt times for an admin operation. Usually it is the max retry times plus 1. * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, * we will stop retrying when we reach any of the limitations. - * @param maxAttempts * @return this for invocation chaining */ AsyncAdminBuilder setMaxAttempts(int maxAttempts); /** * Set the number of retries that are allowed before we start to log. - * @param startLogErrorsCnt * @return this for invocation chaining */ AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java index 00896efc00d..ffb3ae97ecf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java @@ -33,6 +33,8 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { protected long pauseNs; + protected long pauseForCQTBENs; + protected int maxAttempts; protected int startLogErrorsCnt; @@ -41,6 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.operationTimeoutNs = connConf.getOperationTimeoutNs(); this.pauseNs = connConf.getPauseNs(); + this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); this.maxAttempts = connConf.getMaxRetries(); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -63,6 +66,12 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { return this; } + @Override + public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(timeout); + return this; + } + @Override public AsyncAdminBuilder setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index ce0fca7eb04..7a381db39c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -44,10 +44,10 @@ public class AsyncAdminRequestRetryingCaller extends AsyncRpcRetryingCaller callable) { - super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { + super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index e429422d1c1..464eff54fcb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -45,6 +45,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -103,6 +104,8 @@ class AsyncBatchRpcRetryingCaller { private final long pauseNs; + private final long pauseForCQTBENs; + private final int maxAttempts; private final long operationTimeoutNs; @@ -147,17 +150,17 @@ class AsyncBatchRpcRetryingCaller { } public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - TableName tableName, List actions, long pauseNs, int maxAttempts, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, List actions, long pauseNs, long pauseForCQTBENs, + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.pauseNs = pauseNs; + this.pauseForCQTBENs = pauseForCQTBENs; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; - this.actions = new ArrayList<>(actions.size()); this.futures = new ArrayList<>(actions.size()); this.action2Future = new IdentityHashMap<>(actions.size()); @@ -337,7 +340,7 @@ class AsyncBatchRpcRetryingCaller { } }); if (!failedActions.isEmpty()) { - tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue()); + tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false); } } @@ -442,24 +445,27 @@ class AsyncBatchRpcRetryingCaller { List copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream()) .collect(Collectors.toList()); addError(copiedActions, error, serverName); - tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException); + tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, + error instanceof CallQueueTooBigException); } - private void tryResubmit(Stream actions, int tries, boolean immediately) { + private void tryResubmit(Stream actions, int tries, boolean immediately, + boolean isCallQueueTooBig) { if (immediately) { groupAndSend(actions, tries); return; } long delayNs; + long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { failAll(actions, tries); return; } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); } else { - delayNs = getPauseTime(pauseNs, tries - 1); + delayNs = getPauseTime(pauseNsToUse, tries - 1); } retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); } @@ -498,7 +504,7 @@ class AsyncBatchRpcRetryingCaller { sendOrDelay(actionsByServer, tries); } if (!locateFailed.isEmpty()) { - tryResubmit(locateFailed.stream(), tries, false); + tryResubmit(locateFailed.stream(), tries, false, false); } }); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index d6cca48e9c5..5fd00a5f6ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -73,6 +73,8 @@ class AsyncClientScanner { private final long pauseNs; + private final long pauseForCQTBENs; + private final int maxAttempts; private final long scanTimeoutNs; @@ -84,8 +86,8 @@ class AsyncClientScanner { private final ScanResultCache resultCache; public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, - AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { + AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, + int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); } @@ -98,6 +100,7 @@ class AsyncClientScanner { this.conn = conn; this.retryTimer = retryTimer; this.pauseNs = pauseNs; + this.pauseForCQTBENs = pauseForCQTBENs; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -160,14 +163,16 @@ class AsyncClientScanner { } private void startScan(OpenScannerResponse resp) { - addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()) - .location(resp.loc).remote(resp.isRegionServerRemote) - .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) - .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .start(resp.controller, resp.resp), (hasMore, error) -> { + addListener( + conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) + .remote(resp.isRegionServerRemote) + .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) + .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), + (hasMore, error) -> { if (error != null) { consumer.onError(error); return; @@ -185,8 +190,8 @@ class AsyncClientScanner { .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) - .call(); + .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); } private long getPrimaryTimeoutNs() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 22042c9b831..6596578d7fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; @@ -54,6 +55,8 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFE import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Timeout configs. @@ -61,6 +64,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private class AsyncConnectionConfiguration { + private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class); + private final long metaOperationTimeoutNs; // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected @@ -79,6 +84,8 @@ class AsyncConnectionConfiguration { private final long pauseNs; + private final long pauseForCQTBENs; + private final int maxRetries; /** How many retries are allowed before we start to log */ @@ -121,8 +128,16 @@ class AsyncConnectionConfiguration { TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs)); this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs)); - this.pauseNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); + long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); + long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs); + if (pauseForCQTBEMs < pauseMs) { + LOG.warn( + "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", + HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs); + pauseForCQTBEMs = pauseMs; + } + this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); + this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -173,6 +188,10 @@ class AsyncConnectionConfiguration { return pauseNs; } + long getPauseForCQTBENs() { + return pauseForCQTBENs; + } + int getMaxRetries() { return maxRetries; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index 5ba4dee525b..de2778cf6d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -44,10 +44,10 @@ public class AsyncMasterRequestRpcRetryingCaller extends AsyncRpcRetryingCall private final Callable callable; public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - Callable callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 387b103a266..dcf7aa154f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; @@ -59,6 +60,8 @@ public abstract class AsyncRpcRetryingCaller { private final long pauseNs; + private final long pauseForCQTBENs; + private int tries = 1; private final int maxAttempts; @@ -78,12 +81,13 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt) { + long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.priority = priority; this.pauseNs = pauseNs; + this.pauseForCQTBENs = pauseForCQTBENs; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -123,6 +127,7 @@ public abstract class AsyncRpcRetryingCaller { } private void tryScheduleRetry(Throwable error, Consumer updateCachedLocation) { + long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; long delayNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; @@ -130,9 +135,9 @@ public abstract class AsyncRpcRetryingCaller { completeExceptionally(); return; } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); } else { - delayNs = getPauseTime(pauseNs, tries - 1); + delayNs = getPauseTime(pauseNsToUse, tries - 1); } tries++; retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 513f813c6e6..48bde4434be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -58,6 +58,8 @@ class AsyncRpcRetryingCallerFactory { protected long pauseNs = conn.connConf.getPauseNs(); + protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs(); + protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt(); @@ -117,6 +119,11 @@ class AsyncRpcRetryingCallerFactory { return this; } + public SingleRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(pause); + return this; + } + public SingleRequestCallerBuilder maxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; return this; @@ -149,8 +156,8 @@ class AsyncRpcRetryingCallerFactory { public AsyncSingleRequestRpcRetryingCaller build() { preCheck(); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, - locateType, callable, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); } /** @@ -256,6 +263,11 @@ class AsyncRpcRetryingCallerFactory { return this; } + public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(pause); + return this; + } + public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; return this; @@ -280,8 +292,8 @@ class AsyncRpcRetryingCallerFactory { preCheck(); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, - scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); } /** @@ -335,6 +347,11 @@ class AsyncRpcRetryingCallerFactory { return this; } + public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(pause); + return this; + } + public BatchCallerBuilder maxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; return this; @@ -347,7 +364,7 @@ class AsyncRpcRetryingCallerFactory { public AsyncBatchRpcRetryingCaller build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, - maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } public List> call() { @@ -389,6 +406,11 @@ class AsyncRpcRetryingCallerFactory { return this; } + public MasterRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(pause); + return this; + } + public MasterRequestCallerBuilder maxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; return this; @@ -416,7 +438,7 @@ class AsyncRpcRetryingCallerFactory { public AsyncMasterRequestRpcRetryingCaller build() { preCheck(); return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, priority, - pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -465,6 +487,11 @@ class AsyncRpcRetryingCallerFactory { return this; } + public AdminRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(pause); + return this; + } + public AdminRequestCallerBuilder maxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; return this; @@ -487,7 +514,7 @@ class AsyncRpcRetryingCallerFactory { public AsyncAdminRequestRetryingCaller build() { return new AsyncAdminRequestRetryingCaller(retryTimer, conn, priority, pauseNs, - maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } @@ -531,6 +558,11 @@ class AsyncRpcRetryingCallerFactory { return this; } + public ServerRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(pause); + return this; + } + public ServerRequestCallerBuilder maxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; return this; @@ -547,8 +579,8 @@ class AsyncRpcRetryingCallerFactory { } public AsyncServerRequestRpcRetryingCaller build() { - return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, pauseForCQTBENs, + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index b87d1701c0f..1fa3c81e5d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; @@ -97,6 +98,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final long pauseNs; + private final long pauseForCQTBENs; + private final int maxAttempts; private final long scanTimeoutNs; @@ -304,7 +307,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, - int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; this.scanMetrics = scanMetrics; @@ -316,6 +320,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { this.regionServerRemote = isRegionServerRemote; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.pauseNs = pauseNs; + this.pauseForCQTBENs = pauseForCQTBENs; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -405,15 +410,16 @@ class AsyncScanSingleRegionRpcRetryingCaller { return; } long delayNs; + long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; if (scanTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { completeExceptionally(!scannerClosed); return; } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); } else { - delayNs = getPauseTime(pauseNs, tries - 1); + delayNs = getPauseTime(pauseNsToUse, tries - 1); } if (scannerClosed) { completeWhenError(false); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 63c85c210e7..52a2abe3944 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -46,10 +46,10 @@ public class AsyncServerRequestRpcRetryingCaller extends AsyncRpcRetryingCall private ServerName serverName; public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { + super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 9b0dede5ed4..2a552c71b3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -56,10 +56,10 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, - Callable callable, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); this.tableName = tableName; this.row = row; this.replicaId = replicaId; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 6632ad55ad9..4c883a8332d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -76,9 +76,22 @@ public interface AsyncTableBuilder { /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. + * @see #setRetryPauseForCQTBE(long, TimeUnit) */ AsyncTableBuilder setRetryPause(long pause, TimeUnit unit); + /** + * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an + * exponential policy to generate sleep time when retrying. + *

+ * This value should be greater than the normal pause value which could be set with the above + * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} + * means the server is overloaded. We just use the normal pause value for + * {@code CallQueueTooBigException} if here you specify a smaller value. + * @see #setRetryPause(long, TimeUnit) + */ + AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + /** * Set the max retry times for an operation. Usually it is the max attempt times minus 1. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index ee571f10241..399d9ddfaff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -45,6 +45,8 @@ abstract class AsyncTableBuilderBase protected long pauseNs; + protected long pauseForCQTBENs; + protected int maxAttempts; protected int startLogErrorsCnt; @@ -58,6 +60,7 @@ abstract class AsyncTableBuilderBase this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.pauseNs = connConf.getPauseNs(); + this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); this.maxAttempts = retries2Attempts(connConf.getMaxRetries()); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -98,6 +101,12 @@ abstract class AsyncTableBuilderBase return this; } + @Override + public AsyncTableBuilderBase setRetryPauseForCQTBE(long pause, TimeUnit unit) { + this.pauseForCQTBENs = unit.toNanos(pause); + return this; + } + @Override public AsyncTableBuilderBase setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 35cb92276ab..0fd3cba797f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -327,6 +327,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private final long pauseNs; + private final long pauseForCQTBENs; + private final int maxAttempts; private final int startLogErrorsCnt; @@ -341,6 +343,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { this.rpcTimeoutNs = builder.rpcTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs; this.pauseNs = builder.pauseNs; + if (builder.pauseForCQTBENs < builder.pauseNs) { + LOG.warn( + "Configured value of pauseForCQTBENs is {} ms, which is less than" + + " the normal pause value {} ms, use the greater one instead", + TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); + this.pauseForCQTBENs = builder.pauseNs; + } else { + this.pauseForCQTBENs = builder.pauseForCQTBENs; + } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; this.ng = connection.getNonceGenerator(); @@ -348,18 +360,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private MasterRequestCallerBuilder newMasterCaller() { return this.connection.callerFactory. masterRequest() - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt); + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } private AdminRequestCallerBuilder newAdminCaller() { return this.connection.callerFactory. adminRequest() - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt); + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @FunctionalInterface @@ -3357,10 +3369,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private ServerRequestCallerBuilder newServerCaller() { return this.connection.callerFactory. serverRequest() - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt); + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index b2ca3a90b8f..8050137ba7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; @@ -77,6 +79,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType @InterfaceAudience.Private class RawAsyncTableImpl implements AsyncTable { + private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); + private final AsyncConnectionImpl conn; private final Timer retryTimer; @@ -99,6 +103,8 @@ class RawAsyncTableImpl implements AsyncTable { private final long pauseNs; + private final long pauseForCQTBENs; + private final int maxAttempts; private final int startLogErrorsCnt; @@ -113,6 +119,16 @@ class RawAsyncTableImpl implements AsyncTable { this.operationTimeoutNs = builder.operationTimeoutNs; this.scanTimeoutNs = builder.scanTimeoutNs; this.pauseNs = builder.pauseNs; + if (builder.pauseForCQTBENs < builder.pauseNs) { + LOG.warn( + "Configured value of pauseForCQTBENs is {} ms, which is less than" + + " the normal pause value {} ms, use the greater one instead", + TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); + this.pauseForCQTBENs = builder.pauseNs; + } else { + this.pauseForCQTBENs = builder.pauseForCQTBENs; + } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching() @@ -220,8 +236,8 @@ class RawAsyncTableImpl implements AsyncTable { return conn.callerFactory. single().table(tableName).row(row).priority(priority) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt); + .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } private SingleRequestCallerBuilder newCaller( @@ -451,7 +467,8 @@ class RawAsyncTableImpl implements AsyncTable { @Override public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, - pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); + pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt) + .start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -521,7 +538,8 @@ class RawAsyncTableImpl implements AsyncTable { return conn.callerFactory.batch().table(tableName).actions(actions) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); + .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt).call(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java new file mode 100644 index 00000000000..075e1bc2212 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncClientPauseForCallQueueTooBig { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("CQTBE"); + + private static byte[] FAMILY = Bytes.toBytes("Family"); + + private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); + + private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1); + + private static AsyncConnection CONN; + + private static boolean FAIL = false; + + private static ConcurrentMap INVOKED = new ConcurrentHashMap<>(); + + public static final class CQTBERpcScheduler extends SimpleRpcScheduler { + + public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, + int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, + Abortable server, int highPriorityLevel) { + super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, + metaTransitionHandler, priority, server, highPriorityLevel); + } + + @Override + public boolean dispatch(CallRunner callTask) throws InterruptedException { + if (FAIL) { + MethodDescriptor method = callTask.getRpcCall().getMethod(); + // this is for test scan, where we will send a open scanner first and then a next, and we + // expect that we hit CQTBE two times. + if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) { + return false; + } + } + return super.dispatch(callTask); + } + } + + public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory { + + @Override + public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { + int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + return new CQTBERpcScheduler(conf, handlerCount, + conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), + conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT), + conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT, + HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT), + priority, server, HConstants.QOS_THRESHOLD); + } + + } + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); + UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, + TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS)); + UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, + CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class); + UTIL.startMiniCluster(1); + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + Closeables.close(CONN, true); + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUpBeforeTest() throws IOException { + try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); + } + } + FAIL = true; + } + + @After + public void tearDownAfterTest() throws IOException { + FAIL = false; + INVOKED.clear(); + UTIL.getAdmin().disableTable(TABLE_NAME); + UTIL.getAdmin().deleteTable(TABLE_NAME); + } + + private void assertTime(Callable callable, long time) throws Exception { + long startNs = System.nanoTime(); + callable.call(); + long costNs = System.nanoTime() - startNs; + assertTrue(costNs > time); + } + + @Test + public void testGet() throws Exception { + assertTime(() -> { + Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); + assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); + return null; + }, PAUSE_FOR_CQTBE_NS); + } + + @Test + public void testBatch() throws Exception { + assertTime(() -> { + List> futures = new ArrayList<>(); + try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME)) { + for (int i = 100; i < 110; i++) { + futures.add(mutator + .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); + } + } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + }, PAUSE_FOR_CQTBE_NS); + } + + @Test + public void testScan() throws Exception { + // we will hit CallQueueTooBigException two times so the sleep time should be twice + assertTime(() -> { + try ( + ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(80))) { + for (int i = 0; i < 100; i++) { + Result result = scanner.next(); + assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); + } + assertNull(scanner.next()); + } + return null; + }, PAUSE_FOR_CQTBE_NS * 2); + } +}