HBASE-27652 Client-side lock contention around Configuration when using read replica regions
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
ffff78a00d
commit
d151af1663
|
@ -335,8 +335,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
this.rpcCallerFactory =
|
||||
RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
|
||||
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig,
|
||||
interceptor, this.stats, this.metrics);
|
||||
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
|
||||
|
||||
// Do we publish the status?
|
||||
|
@ -2221,8 +2221,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
@Override
|
||||
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
|
||||
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(),
|
||||
metrics);
|
||||
return RpcRetryingCallerFactory.instantiate(conf, connectionConfig, this.interceptor,
|
||||
this.getStatisticsTracker(), metrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1305,8 +1305,8 @@ public class HTable implements Table {
|
|||
final List<String> callbackErrorServers = new ArrayList<>();
|
||||
Object[] results = new Object[execs.size()];
|
||||
|
||||
AsyncProcess asyncProcess = new AsyncProcess(
|
||||
connection, configuration, RpcRetryingCallerFactory.instantiate(configuration,
|
||||
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
|
||||
RpcRetryingCallerFactory.instantiate(configuration, connConfiguration,
|
||||
connection.getStatisticsTracker(), connection.getConnectionMetrics()),
|
||||
RpcControllerFactory.instantiate(configuration));
|
||||
|
||||
|
|
|
@ -423,8 +423,10 @@ public class HTableMultiplexer {
|
|||
this.addr = addr;
|
||||
this.multiplexer = htableMultiplexer;
|
||||
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
|
||||
final ConnectionConfiguration connectionConfig =
|
||||
conn != null ? conn.getConnectionConfiguration() : new ConnectionConfiguration(conf);
|
||||
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf,
|
||||
conn == null ? null : conn.getConnectionMetrics());
|
||||
connectionConfig, conn == null ? null : conn.getConnectionMetrics());
|
||||
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
|
||||
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -29,20 +30,18 @@ public class RpcRetryingCallerFactory {
|
|||
|
||||
/** Configuration key for a custom {@link RpcRetryingCaller} */
|
||||
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
|
||||
protected final Configuration conf;
|
||||
private final ConnectionConfiguration connectionConf;
|
||||
private final RetryingCallerInterceptor interceptor;
|
||||
private final int startLogErrorsCnt;
|
||||
private final MetricsConnection metrics;
|
||||
|
||||
public RpcRetryingCallerFactory(Configuration conf) {
|
||||
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
|
||||
public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) {
|
||||
this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
|
||||
}
|
||||
|
||||
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor,
|
||||
MetricsConnection metrics) {
|
||||
this.conf = conf;
|
||||
this.connectionConf = new ConnectionConfiguration(conf);
|
||||
public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf,
|
||||
RetryingCallerInterceptor interceptor, MetricsConnection metrics) {
|
||||
this.connectionConf = connectionConf;
|
||||
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
|
||||
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||
this.interceptor = interceptor;
|
||||
|
@ -71,30 +70,39 @@ public class RpcRetryingCallerFactory {
|
|||
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called on process initialization", link = "",
|
||||
allowedOnPath = ".*/(HRegionServer|LoadIncrementalHFiles|SecureBulkLoadClient)\\.java")
|
||||
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
|
||||
MetricsConnection metrics) {
|
||||
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
|
||||
metrics);
|
||||
return instantiate(configuration, new ConnectionConfiguration(configuration), metrics);
|
||||
}
|
||||
|
||||
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
|
||||
ConnectionConfiguration connectionConf, MetricsConnection metrics) {
|
||||
return instantiate(configuration, connectionConf,
|
||||
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics);
|
||||
}
|
||||
|
||||
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
|
||||
ConnectionConfiguration connectionConf, ServerStatisticTracker stats,
|
||||
MetricsConnection metrics) {
|
||||
return instantiate(configuration, connectionConf,
|
||||
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics);
|
||||
}
|
||||
|
||||
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
|
||||
ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor,
|
||||
ServerStatisticTracker stats, MetricsConnection metrics) {
|
||||
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
|
||||
metrics);
|
||||
}
|
||||
|
||||
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
|
||||
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
|
||||
MetricsConnection metrics) {
|
||||
String clazzName = RpcRetryingCallerFactory.class.getName();
|
||||
String rpcCallerFactoryClazz =
|
||||
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
|
||||
RpcRetryingCallerFactory factory;
|
||||
if (rpcCallerFactoryClazz.equals(clazzName)) {
|
||||
factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
|
||||
factory = new RpcRetryingCallerFactory(configuration, connectionConf, interceptor, metrics);
|
||||
} else {
|
||||
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
|
||||
new Class[] { Configuration.class }, new Object[] { configuration });
|
||||
new Class[] { Configuration.class, ConnectionConfiguration.class },
|
||||
new Object[] { configuration, connectionConf });
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
|
|
@ -81,7 +81,8 @@ public class RpcRetryingCallerWithReadReplicas {
|
|||
this.operationTimeout = operationTimeout;
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
this.timeBeforeReplicas = timeBeforeReplicas;
|
||||
this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
|
||||
this.rpcRetryingCallerFactory =
|
||||
new RpcRetryingCallerFactory(conf, cConnection.getConnectionConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -55,7 +55,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(ScannerCallableWithReplicas.class);
|
||||
volatile ScannerCallable currentScannerCallable;
|
||||
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
|
||||
final ClusterConnection cConnection;
|
||||
private final ClusterConnection cConnection;
|
||||
protected final ExecutorService pool;
|
||||
protected final int timeBeforeReplicas;
|
||||
private final Scan scan;
|
||||
|
@ -175,12 +175,15 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
}
|
||||
regionReplication = rl.size();
|
||||
}
|
||||
// allocate a boundedcompletion pool of some multiple of number of replicas.
|
||||
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
|
||||
// allocate a bounded-completion pool of some multiple of number of replicas.
|
||||
// We want to accommodate some RPCs for redundant replica scans (but are still in progress)
|
||||
final ConnectionConfiguration connectionConfig = cConnection != null
|
||||
? cConnection.getConnectionConfiguration()
|
||||
: new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
|
||||
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
|
||||
new ResultBoundedCompletionService<>(
|
||||
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
|
||||
cConnection == null ? null : cConnection.getConnectionMetrics()),
|
||||
connectionConfig, cConnection == null ? null : cConnection.getConnectionMetrics()),
|
||||
pool, regionReplication * 5);
|
||||
|
||||
AtomicBoolean done = new AtomicBoolean(false);
|
||||
|
@ -382,9 +385,12 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
|
|||
// and we can't invoke it multiple times at the same time)
|
||||
this.caller = ScannerCallableWithReplicas.this.caller;
|
||||
if (scan.getConsistency() == Consistency.TIMELINE) {
|
||||
final ConnectionConfiguration connectionConfig = cConnection != null
|
||||
? cConnection.getConnectionConfiguration()
|
||||
: new ConnectionConfiguration(ScannerCallableWithReplicas.this.conf);
|
||||
this.caller =
|
||||
RpcRetryingCallerFactory
|
||||
.instantiate(ScannerCallableWithReplicas.this.conf,
|
||||
.instantiate(ScannerCallableWithReplicas.this.conf, connectionConfig,
|
||||
cConnection == null ? null : cConnection.getConnectionMetrics())
|
||||
.<Result[]> newCaller();
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public class SecureBulkLoadClient {
|
|||
return response.getBulkToken();
|
||||
}
|
||||
};
|
||||
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null)
|
||||
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
|
||||
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
|
||||
} catch (Throwable throwable) {
|
||||
throw new IOException(throwable);
|
||||
|
@ -91,7 +91,7 @@ public class SecureBulkLoadClient {
|
|||
return null;
|
||||
}
|
||||
};
|
||||
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null).<Void> newCaller()
|
||||
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller()
|
||||
.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
} catch (Throwable throwable) {
|
||||
throw new IOException(throwable);
|
||||
|
|
|
@ -184,13 +184,15 @@ public class TestAsyncProcess {
|
|||
}
|
||||
|
||||
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
|
||||
new RpcControllerFactory(conf));
|
||||
service = Executors.newFixedThreadPool(5);
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
|
||||
new RpcControllerFactory(conf));
|
||||
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
|
||||
new CountingThreadFactory(nbThreads));
|
||||
}
|
||||
|
@ -1702,7 +1704,8 @@ public class TestAsyncProcess {
|
|||
|
||||
static class AsyncProcessForThrowableCheck extends AsyncProcess {
|
||||
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
|
||||
new RpcControllerFactory(conf));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -68,6 +68,8 @@ public class TestAsyncProcessWithRegionException {
|
|||
private static final Result EMPTY_RESULT = Result.create(null, true);
|
||||
private static final IOException IOE = new IOException("YOU CAN'T PASS");
|
||||
private static final Configuration CONF = new Configuration();
|
||||
private static final ConnectionConfiguration CONNECTION_CONFIG =
|
||||
new ConnectionConfiguration(CONF);
|
||||
private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
|
||||
private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
|
||||
private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
|
||||
|
@ -175,7 +177,7 @@ public class TestAsyncProcessWithRegionException {
|
|||
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
|
||||
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
|
||||
Mockito.when(hc.getConfiguration()).thenReturn(CONF);
|
||||
Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
|
||||
Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
|
||||
setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
|
||||
setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
|
||||
Mockito
|
||||
|
@ -196,7 +198,8 @@ public class TestAsyncProcessWithRegionException {
|
|||
private final ExecutorService service = Executors.newFixedThreadPool(5);
|
||||
|
||||
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
|
||||
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
|
||||
new RpcControllerFactory(conf));
|
||||
}
|
||||
|
||||
public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
|
||||
|
|
|
@ -70,6 +70,7 @@ public class TestClientScanner {
|
|||
Scan scan;
|
||||
ExecutorService pool;
|
||||
Configuration conf;
|
||||
ConnectionConfiguration connectionConfig;
|
||||
|
||||
ClusterConnection clusterConn;
|
||||
RpcRetryingCallerFactory rpcFactory;
|
||||
|
@ -86,7 +87,9 @@ public class TestClientScanner {
|
|||
pool = Executors.newSingleThreadExecutor();
|
||||
scan = new Scan();
|
||||
conf = new Configuration();
|
||||
connectionConfig = new ConnectionConfiguration(conf);
|
||||
Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
|
||||
Mockito.when(clusterConn.getConnectionConfiguration()).thenReturn(connectionConfig);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -473,7 +476,7 @@ public class TestClientScanner {
|
|||
|
||||
// Mock a caller which calls the callable for ScannerCallableWithReplicas,
|
||||
// but throws an exception for the actual scanner calls via callWithRetries.
|
||||
rpcFactory = new MockRpcRetryingCallerFactory(conf);
|
||||
rpcFactory = new MockRpcRetryingCallerFactory(conf, connectionConfig);
|
||||
conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
|
||||
MockRpcRetryingCallerFactory.class.getName());
|
||||
|
||||
|
@ -496,8 +499,9 @@ public class TestClientScanner {
|
|||
|
||||
public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
|
||||
|
||||
public MockRpcRetryingCallerFactory(Configuration conf) {
|
||||
super(conf);
|
||||
public MockRpcRetryingCallerFactory(Configuration conf,
|
||||
ConnectionConfiguration connectionConf) {
|
||||
super(conf, connectionConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -87,13 +87,14 @@ public class TestSnapshotFromAdmin {
|
|||
// setup the conf to match the expected properties
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
|
||||
conf.setLong("hbase.client.pause", pauseTime);
|
||||
ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf);
|
||||
|
||||
// mock the master admin to our mock
|
||||
MasterKeepAliveConnection mockMaster = Mockito.mock(MasterKeepAliveConnection.class);
|
||||
Mockito.when(mockConnection.getConfiguration()).thenReturn(conf);
|
||||
Mockito.when(mockConnection.getMaster()).thenReturn(mockMaster);
|
||||
// we need a real retrying caller
|
||||
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
|
||||
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig);
|
||||
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
|
||||
Mockito.when(controllerFactory.newController())
|
||||
.thenReturn(Mockito.mock(HBaseRpcController.class));
|
||||
|
@ -134,9 +135,10 @@ public class TestSnapshotFromAdmin {
|
|||
public void testValidateSnapshotName() throws Exception {
|
||||
ConnectionImplementation mockConnection = Mockito.mock(ConnectionImplementation.class);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf);
|
||||
Mockito.when(mockConnection.getConfiguration()).thenReturn(conf);
|
||||
// we need a real retrying caller
|
||||
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
|
||||
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig);
|
||||
RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class);
|
||||
Mockito.when(controllerFactory.newController())
|
||||
.thenReturn(Mockito.mock(HBaseRpcController.class));
|
||||
|
|
|
@ -125,7 +125,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
|
|||
return null;
|
||||
}
|
||||
};
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
||||
|
|
|
@ -390,8 +390,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
this.sink = sink;
|
||||
this.connection = connection;
|
||||
this.operationTimeout = operationTimeout;
|
||||
this.rpcRetryingCallerFactory = RpcRetryingCallerFactory
|
||||
.instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
|
||||
this.rpcRetryingCallerFactory =
|
||||
RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
|
||||
connection.getConnectionConfiguration(), connection.getConnectionMetrics());
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
|
||||
this.pool = pool;
|
||||
this.tableDescriptors = tableDescriptors;
|
||||
|
|
|
@ -872,7 +872,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
List<LoadQueueItem> toRetry = new ArrayList<>();
|
||||
try {
|
||||
Configuration conf = getConf();
|
||||
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null).<byte[]> newCaller()
|
||||
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
|
||||
.callWithRetries(serviceCallable, Integer.MAX_VALUE);
|
||||
if (region == null) {
|
||||
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
|
||||
|
|
|
@ -62,12 +62,14 @@ public class HConnectionTestingUtility {
|
|||
*/
|
||||
public static ClusterConnection getMockedConnection(final Configuration conf)
|
||||
throws ZooKeeperConnectionException {
|
||||
ConnectionConfiguration connectionConfig = new ConnectionConfiguration(conf);
|
||||
ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
|
||||
Mockito.when(connection.getConfiguration()).thenReturn(conf);
|
||||
Mockito.when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
|
||||
Mockito.when(connection.getRpcControllerFactory())
|
||||
.thenReturn(Mockito.mock(RpcControllerFactory.class));
|
||||
// we need a real retrying caller
|
||||
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf);
|
||||
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf, connectionConfig);
|
||||
Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
|
||||
return connection;
|
||||
}
|
||||
|
@ -123,11 +125,12 @@ public class HConnectionTestingUtility {
|
|||
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
|
||||
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
|
||||
AsyncProcess asyncProcess = new AsyncProcess(c, conf,
|
||||
RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()),
|
||||
RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration, c.getConnectionMetrics()),
|
||||
RpcControllerFactory.instantiate(conf));
|
||||
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
|
||||
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory
|
||||
.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
|
||||
Mockito.when(c.getNewRpcRetryingCallerFactory(conf))
|
||||
.thenReturn(RpcRetryingCallerFactory.instantiate(conf, connectionConfiguration,
|
||||
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
|
||||
Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
|
||||
Table t = Mockito.mock(Table.class);
|
||||
Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t);
|
||||
|
|
|
@ -1138,8 +1138,8 @@ public class TestConnectionImplementation {
|
|||
|
||||
private final Class<? extends HBaseServerException> exceptionClass;
|
||||
|
||||
public ThrowingCallerFactory(Configuration conf) {
|
||||
super(conf);
|
||||
public ThrowingCallerFactory(Configuration conf, ConnectionConfiguration connectionConfig) {
|
||||
super(conf, connectionConfig);
|
||||
this.exceptionClass =
|
||||
conf.getClass("testSpecialPauseException", null, HBaseServerException.class);
|
||||
}
|
||||
|
|
|
@ -295,6 +295,8 @@ public class TestHBaseAdminNoCluster {
|
|||
|
||||
ClusterConnection connection = mock(ClusterConnection.class);
|
||||
when(connection.getConfiguration()).thenReturn(configuration);
|
||||
ConnectionConfiguration connectionConfig = new ConnectionConfiguration(configuration);
|
||||
when(connection.getConnectionConfiguration()).thenReturn(connectionConfig);
|
||||
MasterKeepAliveConnection masterAdmin =
|
||||
Mockito.mock(MasterKeepAliveConnection.class, new Answer() {
|
||||
@Override
|
||||
|
@ -312,7 +314,8 @@ public class TestHBaseAdminNoCluster {
|
|||
.thenReturn(Mockito.mock(HBaseRpcController.class));
|
||||
|
||||
// we need a real retrying caller
|
||||
RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration);
|
||||
RpcRetryingCallerFactory callerFactory =
|
||||
new RpcRetryingCallerFactory(configuration, connectionConfig);
|
||||
Mockito.when(connection.getRpcRetryingCallerFactory()).thenReturn(callerFactory);
|
||||
|
||||
Admin admin = null;
|
||||
|
|
|
@ -504,7 +504,8 @@ public class TestReplicaWithCluster {
|
|||
return null;
|
||||
}
|
||||
};
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(HTU.getConfiguration(), conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.newCaller();
|
||||
caller.callWithRetries(callable, 10000);
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
||||
|
@ -236,7 +237,9 @@ public class TestLowLatencySpaceQuotas {
|
|||
totalSize += file.getLen();
|
||||
}
|
||||
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
final ClusterConnection clusterConn = (ClusterConnection) conn;
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(),
|
||||
clusterConn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
|
@ -98,7 +99,9 @@ public class TestSpaceQuotaOnBulkLoad {
|
|||
|
||||
// The table is now in violation. Try to do a bulk load
|
||||
ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
try {
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
@ -157,7 +160,9 @@ public class TestSpaceQuotaOnBulkLoad {
|
|||
LOG.debug(file.getPath() + " -> " + file.getLen() + "B");
|
||||
}
|
||||
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
try {
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
|
@ -242,7 +243,9 @@ public class TestSpaceQuotas {
|
|||
|
||||
// The table is now in violation. Try to do a bulk load
|
||||
ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.newCaller();
|
||||
try {
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
@ -301,7 +304,9 @@ public class TestSpaceQuotas {
|
|||
LOG.debug(file.getPath() + " -> " + file.getLen() + "B");
|
||||
}
|
||||
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
|
||||
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.newCaller();
|
||||
try {
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
|
|
@ -230,7 +230,8 @@ public class TestHRegionServerBulkLoad {
|
|||
return null;
|
||||
}
|
||||
};
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
||||
|
|
|
@ -112,7 +112,8 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
|
|||
return null;
|
||||
}
|
||||
};
|
||||
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
|
||||
RpcRetryingCallerFactory factory =
|
||||
new RpcRetryingCallerFactory(conf, conn.getConnectionConfiguration());
|
||||
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
|
||||
caller.callWithRetries(callable, Integer.MAX_VALUE);
|
||||
|
||||
|
|
|
@ -209,8 +209,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
|
|||
locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row,
|
||||
Lists.newArrayList(entry), new AtomicLong());
|
||||
|
||||
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory
|
||||
.instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
|
||||
RpcRetryingCallerFactory factory =
|
||||
RpcRetryingCallerFactory.instantiate(connection.getConfiguration(),
|
||||
connection.getConnectionConfiguration(), connection.getConnectionMetrics());
|
||||
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue