From 2d038edf4cb6ecd28e5e17ff136d8d21ea71edc4 Mon Sep 17 00:00:00 2001 From: Bri Augenreich Date: Tue, 15 Nov 2022 15:41:08 -0500 Subject: [PATCH] HBASE-26809: Report client backoff time for server overloaded (#4786) Co-authored-by: Briana Augenreich Signed-off-by: Bryan Beaudreault --- .../client/AsyncBatchRpcRetryingCaller.java | 4 +++ .../hbase/client/AsyncRequestFutureImpl.java | 6 ++++ .../hbase/client/AsyncRpcRetryingCaller.java | 5 +++ ...syncScanSingleRegionRpcRetryingCaller.java | 7 +++++ .../client/ConnectionImplementation.java | 17 +++++++--- .../apache/hadoop/hbase/client/HTable.java | 5 +-- .../hbase/client/HTableMultiplexer.java | 3 +- .../hbase/client/MetricsConnection.java | 8 +++++ .../client/RpcRetryingCallerFactory.java | 31 ++++++++++++------- .../hbase/client/RpcRetryingCallerImpl.java | 15 ++++----- .../client/ScannerCallableWithReplicas.java | 12 ++++--- .../hbase/client/SecureBulkLoadClient.java | 4 +-- .../hadoop/hbase/client/TestAsyncProcess.java | 8 +++-- .../TestAsyncProcessWithRegionException.java | 3 +- .../client/TestRpcRetryingCallerImpl.java | 4 +-- .../hbase/regionserver/HRegionServer.java | 3 +- .../RegionReplicaReplicationEndpoint.java | 4 +-- .../hbase/tool/LoadIncrementalHFiles.java | 2 +- .../client/HConnectionTestingUtility.java | 5 +-- ...ionReplicaReplicationEndpointNoMaster.java | 4 +-- 20 files changed, 103 insertions(+), 47 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 0798915c08d..7bee885586b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -487,6 +487,10 @@ class AsyncBatchRpcRetryingCaller { } else { delayNs = getPauseTime(pauseNsToUse, tries - 1); } + if (isServerOverloaded) { + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 987d86bd8aa..a91fd5af6af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -747,6 +747,12 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.connectionConfiguration.getPauseMillis()); } + + MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics(); + if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) { + metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS); + } + if (numAttempt > asyncProcess.startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 95ed97e1811..d0bbe4b5fa3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -139,6 +139,11 @@ public abstract class AsyncRpcRetryingCaller { delayNs = getPauseTime(pauseNsToUse, tries - 1); } tries++; + + if (HBaseServerException.isServerOverloaded(error)) { + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index dbaae5c26e2..2653b3c75b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -113,6 +113,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final Runnable completeWhenNoMoreResultsInRegion; + protected final AsyncConnectionImpl conn; + private final CompletableFuture future; private final HBaseRpcController controller; @@ -318,6 +320,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; + this.conn = conn; this.scan = scan; this.scanMetrics = scanMetrics; this.scannerId = scannerId; @@ -441,6 +444,10 @@ class AsyncScanSingleRegionRpcRetryingCaller { return; } tries++; + if (HBaseServerException.isServerOverloaded(error)) { + Optional metrics = conn.getConnectionMetrics(); + metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); + } retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index bb71422d6bf..b66e4242b3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -291,10 +291,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.stats = ServerStatisticTracker.create(conf); this.interceptor = new RetryingCallerInterceptorFactory(conf).build(); - this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); + this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); - this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); @@ -322,6 +320,10 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.metaCache = new MetaCache(this.metrics); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); + this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); + this.rpcCallerFactory = + RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics); + this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); // Do we publish the status? if (shouldListen) { @@ -1048,6 +1050,11 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { // Only relocate the parent region if necessary relocateMeta = !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); + + if (metrics != null && HBaseServerException.isServerOverloaded(e)) { + metrics.incrementServerOverloadedBackoffTime( + ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS); + } } finally { userRegionLock.unlock(); } @@ -2175,8 +2182,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { @Override public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { - return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, - this.getStatisticsTracker()); + return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(), + metrics); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index f36c4af0de0..80325abd7f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1305,8 +1305,9 @@ public class HTable implements Table { final List callbackErrorServers = new ArrayList<>(); Object[] results = new Object[execs.size()]; - AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, - RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), + AsyncProcess asyncProcess = new AsyncProcess( + connection, configuration, RpcRetryingCallerFactory.instantiate(configuration, + connection.getStatisticsTracker(), connection.getConnectionMetrics()), RpcControllerFactory.instantiate(configuration)); Batch.Callback resultsCallback = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index c4c95d73c2b..6b54149f9f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -423,7 +423,8 @@ public class HTableMultiplexer { this.addr = addr; this.multiplexer = htableMultiplexer; this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); - RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); + RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, + conn == null ? null : conn.getConnectionMetrics()); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index dc452bcd9d9..2ad2944cddd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -315,6 +315,7 @@ public class MetricsConnection implements StatisticTrackable { protected final Histogram numActionsPerServerHist; protected final Counter nsLookups; protected final Counter nsLookupsFailed; + protected final Timer overloadedBackoffTimer; // dynamic metrics @@ -377,6 +378,9 @@ public class MetricsConnection implements StatisticTrackable { this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); + this.overloadedBackoffTimer = + registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope)); + this.reporter = JmxReporter.forRegistry(this.registry).build(); this.reporter.start(); } @@ -449,6 +453,10 @@ public class MetricsConnection implements StatisticTrackable { this.runnerStats.updateDelayInterval(interval); } + public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) { + overloadedBackoffTimer.update(time, timeUnit); + } + /** * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index a1991378228..3e8545f6a38 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -33,17 +33,20 @@ public class RpcRetryingCallerFactory { private final ConnectionConfiguration connectionConf; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + private final MetricsConnection metrics; public RpcRetryingCallerFactory(Configuration conf) { - this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); + this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } - public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { + public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor, + MetricsConnection metrics) { this.conf = conf; this.connectionConf = new ConnectionConfiguration(conf); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; + this.metrics = metrics; } /** @@ -54,7 +57,7 @@ public class RpcRetryingCallerFactory { // is cheap as it does not require parsing a complex structure. return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), - interceptor, startLogErrorsCnt, rpcTimeout); + interceptor, startLogErrorsCnt, rpcTimeout, metrics); } /** @@ -65,26 +68,30 @@ public class RpcRetryingCallerFactory { // is cheap as it does not require parsing a complex structure. return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), - interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout()); - } - - public static RpcRetryingCallerFactory instantiate(Configuration configuration) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); + interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, - ServerStatisticTracker stats) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); + MetricsConnection metrics) { + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, + metrics); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, - RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { + ServerStatisticTracker stats, MetricsConnection metrics) { + return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, + metrics); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + RetryingCallerInterceptor interceptor, ServerStatisticTracker stats, + MetricsConnection metrics) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); RpcRetryingCallerFactory factory; if (rpcCallerFactoryClazz.equals(clazzName)) { - factory = new RpcRetryingCallerFactory(configuration, interceptor); + factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics); } else { factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 67b12b8a34b..4d88e34ff65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -26,6 +26,7 @@ import java.net.SocketTimeoutException; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseServerException; @@ -63,15 +64,11 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; + private final MetricsConnection metrics; public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, - int startLogErrorsCnt) { - this(pause, pauseForServerOverloaded, retries, - RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); - } - - public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, - RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { + RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout, + MetricsConnection metricsConnection) { this.pause = pause; this.pauseForServerOverloaded = pauseForServerOverloaded; this.maxAttempts = retries2Attempts(retries); @@ -80,6 +77,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { this.startLogErrorsCnt = startLogErrorsCnt; this.tracker = new RetryingTimeTracker(); this.rpcTimeout = rpcTimeout; + this.metrics = metricsConnection; } @Override @@ -158,6 +156,9 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t); } + if (metrics != null && HBaseServerException.isServerOverloaded(t)) { + metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS); + } } finally { interceptor.updateFailureInfo(context); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index fe155136cb2..27cc4d15126 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -179,8 +179,9 @@ class ScannerCallableWithReplicas implements RetryingCallable { // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = new ResultBoundedCompletionService<>( - RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, - regionReplication * 5); + RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf, + cConnection == null ? null : cConnection.getConnectionMetrics()), + pool, regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); @@ -381,8 +382,11 @@ class ScannerCallableWithReplicas implements RetryingCallable { // and we can't invoke it multiple times at the same time) this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { - this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf).< - Result[]> newCaller(); + this.caller = + RpcRetryingCallerFactory + .instantiate(ScannerCallableWithReplicas.this.conf, + cConnection == null ? null : cConnection.getConnectionMetrics()) + . newCaller(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 825a58e7bdd..1838d78eb9f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -69,7 +69,7 @@ public class SecureBulkLoadClient { return response.getBulkToken(); } }; - return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) + return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null) . newCaller().callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { throw new IOException(throwable); @@ -91,7 +91,7 @@ public class SecureBulkLoadClient { return null; } }; - RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null). newCaller() + RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null). newCaller() .callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { throw new IOException(throwable); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index de6d6012322..a55001e1627 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -246,7 +246,8 @@ public class TestAsyncProcess { } }); - return new RpcRetryingCallerImpl(100, 500, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { @Override public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { @@ -307,7 +308,7 @@ public class TestAsyncProcess { private final IOException e; public CallerWithFailure(IOException e) { - super(100, 500, 100, 9); + super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null); this.e = e; } @@ -412,7 +413,8 @@ public class TestAsyncProcess { replicaCalls.incrementAndGet(); } - return new RpcRetryingCallerImpl(100, 500, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) throws IOException, RuntimeException { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java index ca9aabfef04..98c13761262 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java @@ -222,7 +222,8 @@ public class TestAsyncProcessWithRegionException { }); }); mr.addException(REGION_INFO.getRegionName(), IOE); - return new RpcRetryingCallerImpl(100, 500, 0, 9) { + return new RpcRetryingCallerImpl(100, 500, 0, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) { @Override public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java index 3d3d64f4c21..a0177190035 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcRetryingCallerImpl.java @@ -58,8 +58,8 @@ public class TestRpcRetryingCallerImpl { long pauseMillis = 1; long specialPauseMillis = 2; - RpcRetryingCallerImpl caller = - new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0); + RpcRetryingCallerImpl caller = new RpcRetryingCallerImpl<>(pauseMillis, + specialPauseMillis, 2, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 0, 0, null); RetryingCallable callable = new ThrowingCallable(CallQueueTooBigException.class, specialPauseMillis); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index deee5b1e00d..e547a462f9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -644,7 +644,8 @@ public class HRegionServer extends Thread serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); - rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf, + clusterConnection == null ? null : clusterConnection.getConnectionMetrics()); // login the zookeeper client principal (if using security) ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index 21a2d51326a..cf0f69372d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -390,8 +390,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { this.sink = sink; this.connection = connection; this.operationTimeout = operationTimeout; - this.rpcRetryingCallerFactory = - RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); + this.rpcRetryingCallerFactory = RpcRetryingCallerFactory + .instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.pool = pool; this.tableDescriptors = tableDescriptors; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 5e3f2e9468f..54adfd22a36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -871,7 +871,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { List toRetry = new ArrayList<>(); try { Configuration conf = getConf(); - byte[] region = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() + byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null). newCaller() .callWithRetries(serviceCallable, Integer.MAX_VALUE); if (region == null) { LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index de4e7d41ed6..139d8bf8b66 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -123,10 +123,11 @@ public class HConnectionTestingUtility { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); AsyncProcess asyncProcess = new AsyncProcess(c, conf, - RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf)); + RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()), + RpcControllerFactory.instantiate(conf)); Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory - .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); + .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null)); Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class)); Table t = Mockito.mock(Table.class); Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index da95c6fa7d8..7d588712c33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -209,8 +209,8 @@ public class TestRegionReplicaReplicationEndpointNoMaster { locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), new AtomicLong()); - RpcRetryingCallerFactory factory = - RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); + RpcRetryingCallerFactory factory = RpcRetryingCallerFactory + .instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); factory. newCaller().callWithRetries(callable, 10000); } }