HBASE-27652 Client-side lock contention around Configuration when using read replica regions (#5036)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Nick Dimiduk 2023-03-15 16:07:48 +01:00 committed by GitHub
parent ef8a981f22
commit dbb78388e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 114 additions and 62 deletions

View File

@ -344,8 +344,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig,
RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics); interceptor, this.stats, this.metrics);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
// Do we publish the status? // Do we publish the status?
@ -2250,8 +2250,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
@Override @Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(), return RpcRetryingCallerFactory.instantiate(conf, connectionConfig, this.interceptor,
metrics); this.getStatisticsTracker(), metrics);
} }
@Override @Override

View File

@ -1305,8 +1305,8 @@ public class HTable implements Table {
final List<String> callbackErrorServers = new ArrayList<>(); final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()]; Object[] results = new Object[execs.size()];
AsyncProcess asyncProcess = new AsyncProcess( AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
connection, configuration, RpcRetryingCallerFactory.instantiate(configuration, RpcRetryingCallerFactory.instantiate(configuration, connConfiguration,
connection.getStatisticsTracker(), connection.getConnectionMetrics()), connection.getStatisticsTracker(), connection.getConnectionMetrics()),
RpcControllerFactory.instantiate(configuration)); RpcControllerFactory.instantiate(configuration));

View File

@ -423,8 +423,10 @@ public class HTableMultiplexer {
this.addr = addr; this.addr = addr;
this.multiplexer = htableMultiplexer; this.multiplexer = htableMultiplexer;
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
final ConnectionConfiguration connectionConfig =
conn != null ? conn.getConnectionConfiguration() : new ConnectionConfiguration(conf);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf,
conn == null ? null : conn.getConnectionMetrics()); connectionConfig, conn == null ? null : conn.getConnectionMetrics());
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -29,20 +30,18 @@ public class RpcRetryingCallerFactory {
/** Configuration key for a custom {@link RpcRetryingCaller} */ /** Configuration key for a custom {@link RpcRetryingCaller} */
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
protected final Configuration conf;
private final ConnectionConfiguration connectionConf; private final ConnectionConfiguration connectionConf;
private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
private final MetricsConnection metrics; private final MetricsConnection metrics;
public RpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) {
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null); this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
} }
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor, public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf,
MetricsConnection metrics) { RetryingCallerInterceptor interceptor, MetricsConnection metrics) {
this.conf = conf; this.connectionConf = connectionConf;
this.connectionConf = new ConnectionConfiguration(conf);
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor; this.interceptor = interceptor;
@ -71,30 +70,39 @@ public class RpcRetryingCallerFactory {
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics); interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
} }
@RestrictedApi(explanation = "Should only be called on process initialization", link = "",
allowedOnPath = ".*/(HRegionServer|LoadIncrementalHFiles|SecureBulkLoadClient)\\.java")
public static RpcRetryingCallerFactory instantiate(Configuration configuration, public static RpcRetryingCallerFactory instantiate(Configuration configuration,
MetricsConnection metrics) { MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, return instantiate(configuration, new ConnectionConfiguration(configuration), metrics);
metrics);
} }
public static RpcRetryingCallerFactory instantiate(Configuration configuration, public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ConnectionConfiguration connectionConf, MetricsConnection metrics) {
return instantiate(configuration, connectionConf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ConnectionConfiguration connectionConf, ServerStatisticTracker stats,
MetricsConnection metrics) {
return instantiate(configuration, connectionConf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor,
ServerStatisticTracker stats, MetricsConnection metrics) { ServerStatisticTracker stats, MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
metrics);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
MetricsConnection metrics) {
String clazzName = RpcRetryingCallerFactory.class.getName(); String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz = String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory; RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) { if (rpcCallerFactoryClazz.equals(clazzName)) {
factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics); factory = new RpcRetryingCallerFactory(configuration, connectionConf, interceptor, metrics);
} else { } else {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration }); new Class[] { Configuration.class, ConnectionConfiguration.class },
new Object[] { configuration, connectionConf });
} }
return factory; return factory;
} }

View File

@ -81,7 +81,8 @@ public class RpcRetryingCallerWithReadReplicas {
this.operationTimeout = operationTimeout; this.operationTimeout = operationTimeout;
this.rpcTimeout = rpcTimeout; this.rpcTimeout = rpcTimeout;
this.timeBeforeReplicas = timeBeforeReplicas; this.timeBeforeReplicas = timeBeforeReplicas;
this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); this.rpcRetryingCallerFactory =
new RpcRetryingCallerFactory(conf, cConnection.getConnectionConfiguration());
} }
/** /**

View File

@ -55,7 +55,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class); private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
volatile ScannerCallable currentScannerCallable; volatile ScannerCallable currentScannerCallable;
AtomicBoolean replicaSwitched = new AtomicBoolean(false); AtomicBoolean replicaSwitched = new AtomicBoolean(false);
final ClusterConnection cConnection; private final ClusterConnection cConnection;
protected final ExecutorService pool; protected final ExecutorService pool;
protected final int timeBeforeReplicas; protected final int timeBeforeReplicas;
private final Scan scan; private final Scan scan;
@ -175,12 +175,15 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
} }
regionReplication = rl.size(); regionReplication = rl.size();
} }
// allocate a boundedcompletion pool of some multiple of number of replicas. // allocate a bounded-completion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress) // We want to accommodate some RPCs for redundant replica scans (but are still in progress)
final ConnectionConfiguration connectionConfig = cConnection != null
? cConnection.getConnectionConfiguration()
: new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<>( new ResultBoundedCompletionService<>(
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf, RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
cConnection == null ? null : cConnection.getConnectionMetrics()), connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()),
pool, regionReplication * 5); pool, regionReplication * 5);
AtomicBoolean done = new AtomicBoolean(false); AtomicBoolean done = new AtomicBoolean(false);
@ -382,9 +385,12 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// and we can't invoke it multiple times at the same time) // and we can't invoke it multiple times at the same time)
this.caller = ScannerCallableWithReplicas.this.caller; this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) { if (scan.getConsistency() == Consistency.TIMELINE) {
final ConnectionConfiguration connectionConfig = cConnection != null
? cConnection.getConnectionConfiguration()
: new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
this.caller = this.caller =
RpcRetryingCallerFactory RpcRetryingCallerFactory
.instantiate(ScannerCallableWithReplicas.this.conf, .instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig,
cConnection == null ? null : cConnection.getConnectionMetrics()) cConnection == null ? null : cConnection.getConnectionMetrics())
.<Result[]> newCaller(); .<Result[]> newCaller();
} }

View File

@ -69,7 +69,7 @@ public class SecureBulkLoadClient {
return response.getBulkToken(); return response.getBulkToken();
} }
}; };
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null) return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) { } catch (Throwable throwable) {
throw new IOException(throwable); throw new IOException(throwable);
@ -91,7 +91,7 @@ public class SecureBulkLoadClient {
return null; return null;
} }
}; };
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null).<Void> newCaller() RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller()
.callWithRetries(callable, Integer.MAX_VALUE); .callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) { } catch (Throwable throwable) {
throw new IOException(throwable); throw new IOException(throwable);

View File

@ -184,13 +184,15 @@ public class TestAsyncProcess {
} }
public MyAsyncProcess(ClusterConnection hc, Configuration conf) { public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
service = Executors.newFixedThreadPool(5); service = Executors.newFixedThreadPool(5);
this.conf = conf; this.conf = conf;
} }
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new CountingThreadFactory(nbThreads)); new CountingThreadFactory(nbThreads));
} }
@ -1702,7 +1704,8 @@ public class TestAsyncProcess {
static class AsyncProcessForThrowableCheck extends AsyncProcess { static class AsyncProcessForThrowableCheck extends AsyncProcess {
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) { public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
} }
} }

View File

@ -68,6 +68,8 @@ public class TestAsyncProcessWithRegionException {
private static final Result EMPTY_RESULT = Result.create(null, true); private static final Result EMPTY_RESULT = Result.create(null, true);
private static final IOException IOE = new IOException("YOU CAN'T PASS"); private static final IOException IOE = new IOException("YOU CAN'T PASS");
private static final Configuration CONF = new Configuration(); private static final Configuration CONF = new Configuration();
private static final ConnectionConfiguration CONNECTION_CONFIG =
new ConnectionConfiguration(CONF);
private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE"); private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW"); private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW"); private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
@ -175,7 +177,7 @@ public class TestAsyncProcessWithRegionException {
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng); Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
Mockito.when(hc.getConfiguration()).thenReturn(CONF); Mockito.when(hc.getConfiguration()).thenReturn(CONF);
Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF)); Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION)); setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION)); setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
Mockito Mockito
@ -196,7 +198,8 @@ public class TestAsyncProcessWithRegionException {
private final ExecutorService service = Executors.newFixedThreadPool(5); private final ExecutorService service = Executors.newFixedThreadPool(5);
MyAsyncProcess(ClusterConnection hc, Configuration conf) { MyAsyncProcess(ClusterConnection hc, Configuration conf) {
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf)); super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
new RpcControllerFactory(conf));
} }
public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows) public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)

View File

@ -70,6 +70,7 @@ public class TestClientScanner {
Scan scan; Scan scan;
ExecutorService pool; ExecutorService pool;
Configuration conf; Configuration conf;
ConnectionConfiguration connectionConfig;
ClusterConnection clusterConn; ClusterConnection clusterConn;
RpcRetryingCallerFactory rpcFactory; RpcRetryingCallerFactory rpcFactory;
@ -86,7 +87,9 @@ public class TestClientScanner {
pool = Executors.newSingleThreadExecutor(); pool = Executors.newSingleThreadExecutor();
scan = new Scan(); scan = new Scan();
conf = new Configuration(); conf = new Configuration();
connectionConfig = new ConnectionConfiguration(conf);
Mockito.when(clusterConn.getConfiguration()).thenReturn(conf); Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
Mockito.when(clusterConn.getConnectionConfiguration()).thenReturn(connectionConfig);
} }
@After @After
@ -473,7 +476,7 @@ public class TestClientScanner {
// Mock a caller which calls the callable for ScannerCallableWithReplicas, // Mock a caller which calls the callable for ScannerCallableWithReplicas,
// but throws an exception for the actual scanner calls via callWithRetries. // but throws an exception for the actual scanner calls via callWithRetries.
rpcFactory = new MockRpcRetryingCallerFactory(conf); rpcFactory = new MockRpcRetryingCallerFactory(conf, connectionConfig);
conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
MockRpcRetryingCallerFactory.class.getName()); MockRpcRetryingCallerFactory.class.getName());
@ -496,8 +499,9 @@ public class TestClientScanner {
public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory { public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
public MockRpcRetryingCallerFactory(Configuration conf) { public MockRpcRetryingCallerFactory(Configuration conf,
super(conf); ConnectionConfiguration connectionConf) {
super(conf, connectionConf);
} }
@Override @Override

View File

@ -125,7 +125,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
return null; return null;
} }
}; };
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);

View File

@ -390,8 +390,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
this.sink = sink; this.sink = sink;
this.connection = connection; this.connection = connection;
this.operationTimeout = operationTimeout; this.operationTimeout = operationTimeout;
this.rpcRetryingCallerFactory = RpcRetryingCallerFactory this.rpcRetryingCallerFactory =
.instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
connection.getConnectionConfiguration(), connection.getConnectionMetrics());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool; this.pool = pool;
this.tableDescriptors = tableDescriptors; this.tableDescriptors = tableDescriptors;

View File

@ -872,7 +872,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
List<LoadQueueItem> toRetry = new ArrayList<>(); List<LoadQueueItem> toRetry = new ArrayList<>();
try { try {
Configuration conf = getConf(); Configuration conf = getConf();
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null).<byte[]> newCaller() byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
.callWithRetries(serviceCallable, Integer.MAX_VALUE); .callWithRetries(serviceCallable, Integer.MAX_VALUE);
if (region == null) { if (region == null) {
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)

View File

@ -62,12 +62,14 @@ public class HConnectionTestingUtility {
*/ */
public static ClusterConnection getMockedConnection(final Configuration conf) public static ClusterConnection getMockedConnection(final Configuration conf)
throws ZooKeeperConnectionException { throws ZooKeeperConnectionException {
ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf);
ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class); ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
Mockito.when(connection.getConfiguration()).thenReturn(conf); Mockito.when(connection.getConfiguration()).thenReturn(conf);
Mockito.when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
Mockito.when(connection.getRpcControllerFactory()) Mockito.when(connection.getRpcControllerFactory())
.thenReturn(Mockito.mock(RpcControllerFactory.class)); .thenReturn(Mockito.mock(RpcControllerFactory.class));
// we need a real retrying caller // we need a real retrying caller
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig);
Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
return connection; return connection;
} }
@ -123,11 +125,12 @@ public class HConnectionTestingUtility {
NonceGenerator ng = Mockito.mock(NonceGenerator.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getNonceGenerator()).thenReturn(ng);
AsyncProcess asyncProcess = new AsyncProcess(c, conf, AsyncProcess asyncProcess = new AsyncProcess(c, conf,
RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()), RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration, c.getConnectionMetrics()),
RpcControllerFactory.instantiate(conf)); RpcControllerFactory.instantiate(conf));
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory Mockito.when(c.getNewRpcRetryingCallerFactory(conf))
.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null)); .thenReturn(RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class)); Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
Table t = Mockito.mock(Table.class); Table t = Mockito.mock(Table.class);
Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t); Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t);

View File

@ -1138,8 +1138,8 @@ public class TestConnectionImplementation {
private final Class<? extends HBaseServerException> exceptionClass; private final Class<? extends HBaseServerException> exceptionClass;
public ThrowingCallerFactory(Configuration conf) { public ThrowingCallerFactory(Configuration conf, ConnectionConfiguration connectionConfig) {
super(conf); super(conf, connectionConfig);
this.exceptionClass = this.exceptionClass =
conf.getClass("testSpecialPauseException", null, HBaseServerException.class); conf.getClass("testSpecialPauseException", null, HBaseServerException.class);
} }

View File

@ -272,6 +272,8 @@ public class TestHBaseAdminNoCluster {
ClusterConnection connection = mock(ClusterConnection.class); ClusterConnection connection = mock(ClusterConnection.class);
when(connection.getConfiguration()).thenReturn(configuration); when(connection.getConfiguration()).thenReturn(configuration);
ConnectionConfiguration connectionConfig = new ConnectionConfiguration(configuration);
when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
MasterKeepAliveConnection masterAdmin = mock(MasterKeepAliveConnection.class, new Answer() { MasterKeepAliveConnection masterAdmin = mock(MasterKeepAliveConnection.class, new Answer() {
@Override @Override
public Object answer(InvocationOnMock invocation) throws Throwable { public Object answer(InvocationOnMock invocation) throws Throwable {
@ -287,7 +289,8 @@ public class TestHBaseAdminNoCluster {
when(rpcControllerFactory.newController()).thenReturn(mock(HBaseRpcController.class)); when(rpcControllerFactory.newController()).thenReturn(mock(HBaseRpcController.class));
// we need a real retrying caller // we need a real retrying caller
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration); RpcRetryingCallerFactory callerFactory =
new RpcRetryingCallerFactory(configuration, connectionConfig);
when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
Admin admin = null; Admin admin = null;

View File

@ -504,7 +504,8 @@ public class TestReplicaWithCluster {
return null; return null;
} }
}; };
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(HTU.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.newCaller(); RpcRetryingCaller<Void> caller = factory.newCaller();
caller.callWithRetries(callable, 10000); caller.callWithRetries(callable, 10000);

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClientServiceCallable; import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCaller;
@ -236,7 +237,9 @@ public class TestLowLatencySpaceQuotas {
totalSize += file.getLen(); totalSize += file.getLen();
} }
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); final ClusterConnection clusterConn = (ClusterConnection) conn;
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(),
clusterConn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientServiceCallable; import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
@ -98,7 +99,9 @@ public class TestSpaceQuotaOnBulkLoad {
// The table is now in violation. Try to do a bulk load // The table is now in violation. Try to do a bulk load
ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50); ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
try { try {
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);
@ -157,7 +160,9 @@ public class TestSpaceQuotaOnBulkLoad {
LOG.debug(file.getPath() + " -> " + file.getLen() + "B"); LOG.debug(file.getPath() + " -> " + file.getLen() + "B");
} }
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
try { try {
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientServiceCallable; import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
@ -242,7 +243,9 @@ public class TestSpaceQuotas {
// The table is now in violation. Try to do a bulk load // The table is now in violation. Try to do a bulk load
ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50); ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.newCaller(); RpcRetryingCaller<Void> caller = factory.newCaller();
try { try {
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);
@ -301,7 +304,9 @@ public class TestSpaceQuotas {
LOG.debug(file.getPath() + " -> " + file.getLen() + "B"); LOG.debug(file.getPath() + " -> " + file.getLen() + "B");
} }
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.newCaller(); RpcRetryingCaller<Void> caller = factory.newCaller();
try { try {
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);

View File

@ -230,7 +230,8 @@ public class TestHRegionServerBulkLoad {
return null; return null;
} }
}; };
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);

View File

@ -112,7 +112,8 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
return null; return null;
} }
}; };
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); RpcRetryingCallerFactory factory =
new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable, Integer.MAX_VALUE); caller.callWithRetries(callable, Integer.MAX_VALUE);

View File

@ -209,8 +209,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row,
Lists.newArrayList(entry), new AtomicLong()); Lists.newArrayList(entry), new AtomicLong());
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory RpcRetryingCallerFactory factory =
.instantiate(connection.getConfiguration(), connection.getConnectionMetrics()); RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
connection.getConnectionConfiguration(), connection.getConnectionMetrics());
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
} }
} }