From d151af16632c5110716d6bca3db6bc3130795b7f Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Wed, 15 Mar 2023 16:08:19 +0100 Subject: [PATCH] HBASE-27652 Client-side lock contention around Configuration when using read replica regions Signed-off-by: Duo Zhang --- .../client/ConnectionImplementation.java | 8 ++-- .../apache/hadoop/hbase/client/HTable.java | 4 +- .../hbase/client/HTableMultiplexer.java | 4 +- .../client/RpcRetryingCallerFactory.java | 44 +++++++++++-------- .../RpcRetryingCallerWithReadReplicas.java | 3 +- .../client/ScannerCallableWithReplicas.java | 16 ++++--- .../hbase/client/SecureBulkLoadClient.java | 4 +- .../hadoop/hbase/client/TestAsyncProcess.java | 9 ++-- .../TestAsyncProcessWithRegionException.java | 7 ++- .../hbase/client/TestClientScanner.java | 10 +++-- .../hbase/client/TestSnapshotFromAdmin.java | 6 ++- ...onServerBulkLoadWithOldSecureEndpoint.java | 3 +- .../RegionReplicaReplicationEndpoint.java | 5 ++- .../hbase/tool/LoadIncrementalHFiles.java | 2 +- .../client/HConnectionTestingUtility.java | 11 +++-- .../client/TestConnectionImplementation.java | 4 +- .../hbase/client/TestHBaseAdminNoCluster.java | 5 ++- .../hbase/client/TestReplicaWithCluster.java | 3 +- .../quotas/TestLowLatencySpaceQuotas.java | 5 ++- .../quotas/TestSpaceQuotaOnBulkLoad.java | 9 +++- .../hadoop/hbase/quotas/TestSpaceQuotas.java | 9 +++- .../TestHRegionServerBulkLoad.java | 3 +- ...estHRegionServerBulkLoadWithOldClient.java | 3 +- ...ionReplicaReplicationEndpointNoMaster.java | 5 ++- 24 files changed, 118 insertions(+), 64 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 97c29e86a9a..0c529fb62aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -335,8 +335,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.rpcCallerFactory = - RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics); + this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig, + interceptor, this.stats, this.metrics); this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); // Do we publish the status? @@ -2221,8 +2221,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable { @Override public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { - return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(), - metrics); + return RpcRetryingCallerFactory.instantiate(conf, connectionConfig, this.interceptor, + this.getStatisticsTracker(), metrics); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 80325abd7f9..44761c2fbf1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1305,8 +1305,8 @@ public class HTable implements Table { final List callbackErrorServers = new ArrayList<>(); Object[] results = new Object[execs.size()]; - AsyncProcess asyncProcess = new AsyncProcess( - connection, configuration, RpcRetryingCallerFactory.instantiate(configuration, + AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, + RpcRetryingCallerFactory.instantiate(configuration, connConfiguration, connection.getStatisticsTracker(), connection.getConnectionMetrics()), RpcControllerFactory.instantiate(configuration)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 6b54149f9f9..e81be5bcb25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -423,8 +423,10 @@ public class HTableMultiplexer { this.addr = addr; this.multiplexer = htableMultiplexer; this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); + final ConnectionConfiguration connectionConfig = + conn != null ? conn.getConnectionConfiguration() : new ConnectionConfiguration(conf); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, - conn == null ? null : conn.getConnectionMetrics()); + connectionConfig, conn == null ? null : conn.getConnectionMetrics()); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 3e8545f6a38..c062ad43e25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import com.google.errorprone.annotations.RestrictedApi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -29,20 +30,18 @@ public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; - protected final Configuration conf; private final ConnectionConfiguration connectionConf; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; private final MetricsConnection metrics; - public RpcRetryingCallerFactory(Configuration conf) { - this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); + public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) { + this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); } - public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor, - MetricsConnection metrics) { - this.conf = conf; - this.connectionConf = new ConnectionConfiguration(conf); + public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf, + RetryingCallerInterceptor interceptor, MetricsConnection metrics) { + this.connectionConf = connectionConf; startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); this.interceptor = interceptor; @@ -71,30 +70,39 @@ public class RpcRetryingCallerFactory { interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics); } + @RestrictedApi(explanation = "Should only be called on process initialization", link = "", + allowedOnPath = ".*/(HRegionServer|LoadIncrementalHFiles|SecureBulkLoadClient)\\.java") public static RpcRetryingCallerFactory instantiate(Configuration configuration, MetricsConnection metrics) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, - metrics); + return instantiate(configuration, new ConnectionConfiguration(configuration), metrics); } public static RpcRetryingCallerFactory instantiate(Configuration configuration, + ConnectionConfiguration connectionConf, MetricsConnection metrics) { + return instantiate(configuration, connectionConf, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + ConnectionConfiguration connectionConf, ServerStatisticTracker stats, + MetricsConnection metrics) { + return instantiate(configuration, connectionConf, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics); + } + + public static RpcRetryingCallerFactory instantiate(Configuration configuration, + ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor, ServerStatisticTracker stats, MetricsConnection metrics) { - return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, - metrics); - } - - public static RpcRetryingCallerFactory instantiate(Configuration configuration, - RetryingCallerInterceptor interceptor, ServerStatisticTracker stats, - MetricsConnection metrics) { String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); RpcRetryingCallerFactory factory; if (rpcCallerFactoryClazz.equals(clazzName)) { - factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics); + factory = new RpcRetryingCallerFactory(configuration, connectionConf, interceptor, metrics); } else { factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, - new Class[] { Configuration.class }, new Object[] { configuration }); + new Class[] { Configuration.class, ConnectionConfiguration.class }, + new Object[] { configuration, connectionConf }); } return factory; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 5b2208c1cc7..30718742ef7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -81,7 +81,8 @@ public class RpcRetryingCallerWithReadReplicas { this.operationTimeout = operationTimeout; this.rpcTimeout = rpcTimeout; this.timeBeforeReplicas = timeBeforeReplicas; - this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); + this.rpcRetryingCallerFactory = + new RpcRetryingCallerFactory(conf, cConnection.getConnectionConfiguration()); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 27cc4d15126..227ad849c84 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -55,7 +55,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); volatile ScannerCallable currentScannerCallable; AtomicBoolean replicaSwitched = new AtomicBoolean(false); - final ClusterConnection cConnection; + private final ClusterConnection cConnection; protected final ExecutorService pool; protected final int timeBeforeReplicas; private final Scan scan; @@ -175,12 +175,15 @@ class ScannerCallableWithReplicas implements RetryingCallable { } regionReplication = rl.size(); } - // allocate a boundedcompletion pool of some multiple of number of replicas. - // We want to accomodate some RPCs for redundant replica scans (but are still in progress) + // allocate a bounded-completion pool of some multiple of number of replicas. + // We want to accommodate some RPCs for redundant replica scans (but are still in progress) + final ConnectionConfiguration connectionConfig = cConnection != null + ? cConnection.getConnectionConfiguration() + : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf); ResultBoundedCompletionService> cs = new ResultBoundedCompletionService<>( RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf, - cConnection == null ? null : cConnection.getConnectionMetrics()), + connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()), pool, regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); @@ -382,9 +385,12 @@ class ScannerCallableWithReplicas implements RetryingCallable { // and we can't invoke it multiple times at the same time) this.caller = ScannerCallableWithReplicas.this.caller; if (scan.getConsistency() == Consistency.TIMELINE) { + final ConnectionConfiguration connectionConfig = cConnection != null + ? cConnection.getConnectionConfiguration() + : new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf); this.caller = RpcRetryingCallerFactory - .instantiate(ScannerCallableWithReplicas.this.conf, + .instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()) . newCaller(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 1838d78eb9f..825a58e7bdd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -69,7 +69,7 @@ public class SecureBulkLoadClient { return response.getBulkToken(); } }; - return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null) + return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) . newCaller().callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { throw new IOException(throwable); @@ -91,7 +91,7 @@ public class SecureBulkLoadClient { return null; } }; - RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null). newCaller() + RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null). newCaller() .callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { throw new IOException(throwable); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index cbb02f59a07..b0ddd83efd3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -184,13 +184,15 @@ public class TestAsyncProcess { } public MyAsyncProcess(ClusterConnection hc, Configuration conf) { - super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); + super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), + new RpcControllerFactory(conf)); service = Executors.newFixedThreadPool(5); this.conf = conf; } public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { - super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); + super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), + new RpcControllerFactory(conf)); service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new CountingThreadFactory(nbThreads)); } @@ -1702,7 +1704,8 @@ public class TestAsyncProcess { static class AsyncProcessForThrowableCheck extends AsyncProcess { public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { - super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); + super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), + new RpcControllerFactory(conf)); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java index 98c13761262..e63215c8ea5 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java @@ -68,6 +68,8 @@ public class TestAsyncProcessWithRegionException { private static final Result EMPTY_RESULT = Result.create(null, true); private static final IOException IOE = new IOException("YOU CAN'T PASS"); private static final Configuration CONF = new Configuration(); + private static final ConnectionConfiguration CONNECTION_CONFIG = + new ConnectionConfiguration(CONF); private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW"); private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW"); @@ -175,7 +177,7 @@ public class TestAsyncProcessWithRegionException { Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); Mockito.when(hc.getConfiguration()).thenReturn(CONF); - Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF)); + Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG); setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION)); setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION)); Mockito @@ -196,7 +198,8 @@ public class TestAsyncProcessWithRegionException { private final ExecutorService service = Executors.newFixedThreadPool(5); MyAsyncProcess(ClusterConnection hc, Configuration conf) { - super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); + super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()), + new RpcControllerFactory(conf)); } public AsyncRequestFuture submit(TableName tableName, List rows) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index ffc9caa27d7..16f4f687dfe 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -70,6 +70,7 @@ public class TestClientScanner { Scan scan; ExecutorService pool; Configuration conf; + ConnectionConfiguration connectionConfig; ClusterConnection clusterConn; RpcRetryingCallerFactory rpcFactory; @@ -86,7 +87,9 @@ public class TestClientScanner { pool = Executors.newSingleThreadExecutor(); scan = new Scan(); conf = new Configuration(); + connectionConfig = new ConnectionConfiguration(conf); Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); + Mockito.when(clusterConn.getConnectionConfiguration()).thenReturn(connectionConfig); } @After @@ -473,7 +476,7 @@ public class TestClientScanner { // Mock a caller which calls the callable for ScannerCallableWithReplicas, // but throws an exception for the actual scanner calls via callWithRetries. - rpcFactory = new MockRpcRetryingCallerFactory(conf); + rpcFactory = new MockRpcRetryingCallerFactory(conf, connectionConfig); conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, MockRpcRetryingCallerFactory.class.getName()); @@ -496,8 +499,9 @@ public class TestClientScanner { public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { - public MockRpcRetryingCallerFactory(Configuration conf) { - super(conf); + public MockRpcRetryingCallerFactory(Configuration conf, + ConnectionConfiguration connectionConf) { + super(conf, connectionConf); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java index 8815488f35a..06bb71937f1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java @@ -87,13 +87,14 @@ public class TestSnapshotFromAdmin { // setup the conf to match the expected properties conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); conf.setLong("hbase.client.pause", pauseTime); + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf); // mock the master admin to our mock MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); Mockito.when(mockConnection.getMaster()).thenReturn(mockMaster); // we need a real retrying caller - RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()) .thenReturn(Mockito.mock(HBaseRpcController.class)); @@ -134,9 +135,10 @@ public class TestSnapshotFromAdmin { public void testValidateSnapshotName() throws Exception { ConnectionImplementation mockConnection = Mockito.mock(ConnectionImplementation.class); Configuration conf = HBaseConfiguration.create(); + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf); Mockito.when(mockConnection.getConfiguration()).thenReturn(conf); // we need a real retrying caller - RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()) .thenReturn(Mockito.mock(HBaseRpcController.class)); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java index 60aa57c7e68..5ffd7b92b79 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -125,7 +125,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS return null; } }; - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory. newCaller(); caller.callWithRetries(callable, Integer.MAX_VALUE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index cf0f69372d3..754811ce0e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -390,8 +390,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { this.sink = sink; this.connection = connection; this.operationTimeout = operationTimeout; - this.rpcRetryingCallerFactory = RpcRetryingCallerFactory - .instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); + this.rpcRetryingCallerFactory = + RpcRetryingCallerFactory.instantiate(connection.getConfiguration(), + connection.getConnectionConfiguration(), connection.getConnectionMetrics()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.pool = pool; this.tableDescriptors = tableDescriptors; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 91d80efab41..d94cf1dba5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -872,7 +872,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { List toRetry = new ArrayList<>(); try { Configuration conf = getConf(); - byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null). newCaller() + byte[] region = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() .callWithRetries(serviceCallable, Integer.MAX_VALUE); if (region == null) { LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 139d8bf8b66..a1ff19c2faa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -62,12 +62,14 @@ public class HConnectionTestingUtility { */ public static ClusterConnection getMockedConnection(final Configuration conf) throws ZooKeeperConnectionException { + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf); ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class); Mockito.when(connection.getConfiguration()).thenReturn(conf); + Mockito.when(connection.getConnectionConfiguration()).thenReturn(connectionConfig); Mockito.when(connection.getRpcControllerFactory()) .thenReturn(Mockito.mock(RpcControllerFactory.class)); // we need a real retrying caller - RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); + RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig); Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); return connection; } @@ -123,11 +125,12 @@ public class HConnectionTestingUtility { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(c.getNonceGenerator()).thenReturn(ng); AsyncProcess asyncProcess = new AsyncProcess(c, conf, - RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()), + RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration, c.getConnectionMetrics()), RpcControllerFactory.instantiate(conf)); Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); - Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory - .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null)); + Mockito.when(c.getNewRpcRetryingCallerFactory(conf)) + .thenReturn(RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration, + RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null)); Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class)); Table t = Mockito.mock(Table.class); Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java index 71a56c19418..fd80682d7d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java @@ -1138,8 +1138,8 @@ public class TestConnectionImplementation { private final Class exceptionClass; - public ThrowingCallerFactory(Configuration conf) { - super(conf); + public ThrowingCallerFactory(Configuration conf, ConnectionConfiguration connectionConfig) { + super(conf, connectionConfig); this.exceptionClass = conf.getClass("testSpecialPauseException", null, HBaseServerException.class); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index ad21d1b39c1..9c6d17fc607 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -295,6 +295,8 @@ public class TestHBaseAdminNoCluster { ClusterConnection connection = mock(ClusterConnection.class); when(connection.getConfiguration()).thenReturn(configuration); + ConnectionConfiguration connectionConfig = new ConnectionConfiguration(configuration); + when(connection.getConnectionConfiguration()).thenReturn(connectionConfig); MasterKeepAliveConnection masterAdmin = Mockito.mock(MasterKeepAliveConnection.class, new Answer() { @Override @@ -312,7 +314,8 @@ public class TestHBaseAdminNoCluster { .thenReturn(Mockito.mock(HBaseRpcController.class)); // we need a real retrying caller - RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration); + RpcRetryingCallerFactory callerFactory = + new RpcRetryingCallerFactory(configuration, connectionConfig); Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Admin admin = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 448152b454d..1646a3b81a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -504,7 +504,8 @@ public class TestReplicaWithCluster { return null; } }; - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(HTU.getConfiguration(), conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory.newCaller(); caller.callWithRetries(callable, 10000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java index 3bd7243e5c3..9da13aa5a25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RpcRetryingCaller; @@ -236,7 +237,9 @@ public class TestLowLatencySpaceQuotas { totalSize += file.getLen(); } - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); + final ClusterConnection clusterConn = (ClusterConnection) conn; + RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), + clusterConn.getConnectionConfiguration()); RpcRetryingCaller caller = factory. newCaller(); caller.callWithRetries(callable, Integer.MAX_VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java index bae13cce052..0d2dc99e211 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaOnBulkLoad.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.ResultScanner; @@ -98,7 +99,9 @@ public class TestSpaceQuotaOnBulkLoad { // The table is now in violation. Try to do a bulk load ClientServiceCallable callable = helper.generateFileToLoad(tableName, 1, 50); - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); + ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory. newCaller(); try { caller.callWithRetries(callable, Integer.MAX_VALUE); @@ -157,7 +160,9 @@ public class TestSpaceQuotaOnBulkLoad { LOG.debug(file.getPath() + " -> " + file.getLen() + "B"); } - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); + ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory. newCaller(); try { caller.callWithRetries(callable, Integer.MAX_VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java index 99d575eb529..7fad509d81f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Increment; @@ -242,7 +243,9 @@ public class TestSpaceQuotas { // The table is now in violation. Try to do a bulk load ClientServiceCallable callable = helper.generateFileToLoad(tableName, 1, 50); - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); + ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory.newCaller(); try { caller.callWithRetries(callable, Integer.MAX_VALUE); @@ -301,7 +304,9 @@ public class TestSpaceQuotas { LOG.debug(file.getPath() + " -> " + file.getLen() + "B"); } - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); + ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory.newCaller(); try { caller.callWithRetries(callable, Integer.MAX_VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 354e0124b5b..751d782f601 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -230,7 +230,8 @@ public class TestHRegionServerBulkLoad { return null; } }; - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory. newCaller(); caller.callWithRetries(callable, Integer.MAX_VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java index 69e4384f566..b4e00a90df2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java @@ -112,7 +112,8 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul return null; } }; - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); + RpcRetryingCallerFactory factory = + new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration()); RpcRetryingCaller caller = factory. newCaller(); caller.callWithRetries(callable, Integer.MAX_VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index 7d588712c33..91e04fe6e10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -209,8 +209,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster { locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), new AtomicLong()); - RpcRetryingCallerFactory factory = RpcRetryingCallerFactory - .instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); + RpcRetryingCallerFactory factory = + RpcRetryingCallerFactory.instantiate(connection.getConfiguration(), + connection.getConnectionConfiguration(), connection.getConnectionMetrics()); factory. newCaller().callWithRetries(callable, 10000); } }