HBASE-26809: Report client backoff time for server overloaded (#4786)

Co-authored-by: Briana Augenreich <baugenreich@hubspot.com>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
Bri Augenreich 2022-11-15 15:41:08 -05:00 committed by Bryan Beaudreault
parent 3880884e8e
commit 2d038edf4c
20 changed files with 103 additions and 47 deletions

View File

@ -487,6 +487,10 @@ class AsyncBatchRpcRetryingCaller<T> {
} else { } else {
delayNs = getPauseTime(pauseNsToUse, tries - 1); delayNs = getPauseTime(pauseNsToUse, tries - 1);
} }
if (isServerOverloaded) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
} }

View File

@ -747,6 +747,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
backOffTime = errorsByServer.calculateBackoffTime(oldServer, backOffTime = errorsByServer.calculateBackoffTime(oldServer,
asyncProcess.connectionConfiguration.getPauseMillis()); asyncProcess.connectionConfiguration.getPauseMillis());
} }
MetricsConnection metrics = asyncProcess.connection.getConnectionMetrics();
if (metrics != null && HBaseServerException.isServerOverloaded(throwable)) {
metrics.incrementServerOverloadedBackoffTime(backOffTime, TimeUnit.MILLISECONDS);
}
if (numAttempt > asyncProcess.startLogErrorsCnt) { if (numAttempt > asyncProcess.startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many // We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on // logs, as errors are to be expected when a region moves, splits and so on

View File

@ -139,6 +139,11 @@ public abstract class AsyncRpcRetryingCaller<T> {
delayNs = getPauseTime(pauseNsToUse, tries - 1); delayNs = getPauseTime(pauseNsToUse, tries - 1);
} }
tries++; tries++;
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS); retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
} }

View File

@ -113,6 +113,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final Runnable completeWhenNoMoreResultsInRegion; private final Runnable completeWhenNoMoreResultsInRegion;
protected final AsyncConnectionImpl conn;
private final CompletableFuture<Boolean> future; private final CompletableFuture<Boolean> future;
private final HBaseRpcController controller; private final HBaseRpcController controller;
@ -318,6 +320,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) { int startLogErrorsCnt) {
this.retryTimer = retryTimer; this.retryTimer = retryTimer;
this.conn = conn;
this.scan = scan; this.scan = scan;
this.scanMetrics = scanMetrics; this.scanMetrics = scanMetrics;
this.scannerId = scannerId; this.scannerId = scannerId;
@ -441,6 +444,10 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return; return;
} }
tries++; tries++;
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS); retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
} }

View File

@ -291,10 +291,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
this.stats = ServerStatisticTracker.create(conf); this.stats = ServerStatisticTracker.create(conf);
this.interceptor = new RetryingCallerInterceptorFactory(conf).build(); this.interceptor = new RetryingCallerInterceptorFactory(conf).build();
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
boolean shouldListen = boolean shouldListen =
conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
@ -322,6 +320,10 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
this.metaCache = new MetaCache(this.metrics); this.metaCache = new MetaCache(this.metrics);
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory =
RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats, this.metrics);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
// Do we publish the status? // Do we publish the status?
if (shouldListen) { if (shouldListen) {
@ -1048,6 +1050,11 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
// Only relocate the parent region if necessary // Only relocate the parent region if necessary
relocateMeta = relocateMeta =
!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
if (metrics != null && HBaseServerException.isServerOverloaded(e)) {
metrics.incrementServerOverloadedBackoffTime(
ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS);
}
} finally { } finally {
userRegionLock.unlock(); userRegionLock.unlock();
} }
@ -2175,8 +2182,8 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
@Override @Override
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, this.getStatisticsTracker(),
this.getStatisticsTracker()); metrics);
} }
@Override @Override

View File

@ -1305,8 +1305,9 @@ public class HTable implements Table {
final List<String> callbackErrorServers = new ArrayList<>(); final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()]; Object[] results = new Object[execs.size()];
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, AsyncProcess asyncProcess = new AsyncProcess(
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), connection, configuration, RpcRetryingCallerFactory.instantiate(configuration,
connection.getStatisticsTracker(), connection.getConnectionMetrics()),
RpcControllerFactory.instantiate(configuration)); RpcControllerFactory.instantiate(configuration));
Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback = Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback =

View File

@ -423,7 +423,8 @@ public class HTableMultiplexer {
this.addr = addr; this.addr = addr;
this.multiplexer = htableMultiplexer; this.multiplexer = htableMultiplexer;
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf,
conn == null ? null : conn.getConnectionMetrics());
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

View File

@ -315,6 +315,7 @@ public class MetricsConnection implements StatisticTrackable {
protected final Histogram numActionsPerServerHist; protected final Histogram numActionsPerServerHist;
protected final Counter nsLookups; protected final Counter nsLookups;
protected final Counter nsLookupsFailed; protected final Counter nsLookupsFailed;
protected final Timer overloadedBackoffTimer;
// dynamic metrics // dynamic metrics
@ -377,6 +378,9 @@ public class MetricsConnection implements StatisticTrackable {
this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
this.overloadedBackoffTimer =
registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));
this.reporter = JmxReporter.forRegistry(this.registry).build(); this.reporter = JmxReporter.forRegistry(this.registry).build();
this.reporter.start(); this.reporter.start();
} }
@ -449,6 +453,10 @@ public class MetricsConnection implements StatisticTrackable {
this.runnerStats.updateDelayInterval(interval); this.runnerStats.updateDelayInterval(interval);
} }
public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
overloadedBackoffTimer.update(time, timeUnit);
}
/** /**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}. * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/ */

View File

@ -33,17 +33,20 @@ public class RpcRetryingCallerFactory {
private final ConnectionConfiguration connectionConf; private final ConnectionConfiguration connectionConf;
private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptor interceptor;
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
private final MetricsConnection metrics;
public RpcRetryingCallerFactory(Configuration conf) { public RpcRetryingCallerFactory(Configuration conf) {
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR); this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
} }
public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) { public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor,
MetricsConnection metrics) {
this.conf = conf; this.conf = conf;
this.connectionConf = new ConnectionConfiguration(conf); this.connectionConf = new ConnectionConfiguration(conf);
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
this.interceptor = interceptor; this.interceptor = interceptor;
this.metrics = metrics;
} }
/** /**
@ -54,7 +57,7 @@ public class RpcRetryingCallerFactory {
// is cheap as it does not require parsing a complex structure. // is cheap as it does not require parsing a complex structure.
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt, rpcTimeout); interceptor, startLogErrorsCnt, rpcTimeout, metrics);
} }
/** /**
@ -65,26 +68,30 @@ public class RpcRetryingCallerFactory {
// is cheap as it does not require parsing a complex structure. // is cheap as it does not require parsing a complex structure.
return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(), return new RpcRetryingCallerImpl<>(connectionConf.getPauseMillis(),
connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(), connectionConf.getPauseMillisForServerOverloaded(), connectionConf.getRetriesNumber(),
interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout()); interceptor, startLogErrorsCnt, connectionConf.getRpcTimeout(), metrics);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null);
} }
public static RpcRetryingCallerFactory instantiate(Configuration configuration, public static RpcRetryingCallerFactory instantiate(Configuration configuration,
ServerStatisticTracker stats) { MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats); return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
metrics);
} }
public static RpcRetryingCallerFactory instantiate(Configuration configuration, public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats) { ServerStatisticTracker stats, MetricsConnection metrics) {
return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats,
metrics);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
RetryingCallerInterceptor interceptor, ServerStatisticTracker stats,
MetricsConnection metrics) {
String clazzName = RpcRetryingCallerFactory.class.getName(); String clazzName = RpcRetryingCallerFactory.class.getName();
String rpcCallerFactoryClazz = String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
RpcRetryingCallerFactory factory; RpcRetryingCallerFactory factory;
if (rpcCallerFactoryClazz.equals(clazzName)) { if (rpcCallerFactoryClazz.equals(clazzName)) {
factory = new RpcRetryingCallerFactory(configuration, interceptor); factory = new RpcRetryingCallerFactory(configuration, interceptor, metrics);
} else { } else {
factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, factory = ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration }); new Class[] { Configuration.class }, new Object[] { configuration });

View File

@ -26,6 +26,7 @@ import java.net.SocketTimeoutException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HBaseServerException;
@ -63,15 +64,11 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptor interceptor;
private final RetryingCallerInterceptorContext context; private final RetryingCallerInterceptorContext context;
private final RetryingTimeTracker tracker; private final RetryingTimeTracker tracker;
private final MetricsConnection metrics;
public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries, public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
int startLogErrorsCnt) { RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout,
this(pause, pauseForServerOverloaded, retries, MetricsConnection metricsConnection) {
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
}
public RpcRetryingCallerImpl(long pause, long pauseForServerOverloaded, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
this.pause = pause; this.pause = pause;
this.pauseForServerOverloaded = pauseForServerOverloaded; this.pauseForServerOverloaded = pauseForServerOverloaded;
this.maxAttempts = retries2Attempts(retries); this.maxAttempts = retries2Attempts(retries);
@ -80,6 +77,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
this.startLogErrorsCnt = startLogErrorsCnt; this.startLogErrorsCnt = startLogErrorsCnt;
this.tracker = new RetryingTimeTracker(); this.tracker = new RetryingTimeTracker();
this.rpcTimeout = rpcTimeout; this.rpcTimeout = rpcTimeout;
this.metrics = metricsConnection;
} }
@Override @Override
@ -158,6 +156,9 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
+ t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail(); + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t); throw (SocketTimeoutException) new SocketTimeoutException(msg).initCause(t);
} }
if (metrics != null && HBaseServerException.isServerOverloaded(t)) {
metrics.incrementServerOverloadedBackoffTime(expectedSleep, TimeUnit.MILLISECONDS);
}
} finally { } finally {
interceptor.updateFailureInfo(context); interceptor.updateFailureInfo(context);
} }

View File

@ -179,8 +179,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// We want to accomodate some RPCs for redundant replica scans (but are still in progress) // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new ResultBoundedCompletionService<>( new ResultBoundedCompletionService<>(
RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf,
regionReplication * 5); cConnection == null ? null : cConnection.getConnectionMetrics()),
pool, regionReplication * 5);
AtomicBoolean done = new AtomicBoolean(false); AtomicBoolean done = new AtomicBoolean(false);
replicaSwitched.set(false); replicaSwitched.set(false);
@ -381,8 +382,11 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// and we can't invoke it multiple times at the same time) // and we can't invoke it multiple times at the same time)
this.caller = ScannerCallableWithReplicas.this.caller; this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) { if (scan.getConsistency() == Consistency.TIMELINE) {
this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf).< this.caller =
Result[]> newCaller(); RpcRetryingCallerFactory
.instantiate(ScannerCallableWithReplicas.this.conf,
cConnection == null ? null : cConnection.getConnectionMetrics())
.<Result[]> newCaller();
} }
} }

View File

@ -69,7 +69,7 @@ public class SecureBulkLoadClient {
return response.getBulkToken(); return response.getBulkToken();
} }
}; };
return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null)
.<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) { } catch (Throwable throwable) {
throw new IOException(throwable); throw new IOException(throwable);
@ -91,7 +91,7 @@ public class SecureBulkLoadClient {
return null; return null;
} }
}; };
RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null).<Void> newCaller() RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null, null).<Void> newCaller()
.callWithRetries(callable, Integer.MAX_VALUE); .callWithRetries(callable, Integer.MAX_VALUE);
} catch (Throwable throwable) { } catch (Throwable throwable) {
throw new IOException(throwable); throw new IOException(throwable);

View File

@ -246,7 +246,8 @@ public class TestAsyncProcess {
} }
}); });
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override @Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) throws IOException, RuntimeException { int callTimeout) throws IOException, RuntimeException {
@ -307,7 +308,7 @@ public class TestAsyncProcess {
private final IOException e; private final IOException e;
public CallerWithFailure(IOException e) { public CallerWithFailure(IOException e) {
super(100, 500, 100, 9); super(100, 500, 100, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null);
this.e = e; this.e = e;
} }
@ -412,7 +413,8 @@ public class TestAsyncProcess {
replicaCalls.incrementAndGet(); replicaCalls.incrementAndGet();
} }
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) { return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override @Override
public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) throws IOException, RuntimeException { int callTimeout) throws IOException, RuntimeException {

View File

@ -222,7 +222,8 @@ public class TestAsyncProcessWithRegionException {
}); });
}); });
mr.addException(REGION_INFO.getRegionName(), IOE); mr.addException(REGION_INFO.getRegionName(), IOE);
return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) { return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 9, 0, null) {
@Override @Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) { int callTimeout) {

View File

@ -58,8 +58,8 @@ public class TestRpcRetryingCallerImpl {
long pauseMillis = 1; long pauseMillis = 1;
long specialPauseMillis = 2; long specialPauseMillis = 2;
RpcRetryingCallerImpl<Void> caller = RpcRetryingCallerImpl<Void> caller = new RpcRetryingCallerImpl<>(pauseMillis,
new RpcRetryingCallerImpl<>(pauseMillis, specialPauseMillis, 2, 0); specialPauseMillis, 2, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, 0, 0, null);
RetryingCallable<Void> callable = RetryingCallable<Void> callable =
new ThrowingCallable(CallQueueTooBigException.class, specialPauseMillis); new ThrowingCallable(CallQueueTooBigException.class, specialPauseMillis);

View File

@ -644,7 +644,8 @@ public class HRegionServer extends Thread
serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf,
clusterConnection == null ? null : clusterConnection.getConnectionMetrics());
// login the zookeeper client principal (if using security) // login the zookeeper client principal (if using security)
ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,

View File

@ -390,8 +390,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
this.sink = sink; this.sink = sink;
this.connection = connection; this.connection = connection;
this.operationTimeout = operationTimeout; this.operationTimeout = operationTimeout;
this.rpcRetryingCallerFactory = this.rpcRetryingCallerFactory = RpcRetryingCallerFactory
RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); .instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool; this.pool = pool;
this.tableDescriptors = tableDescriptors; this.tableDescriptors = tableDescriptors;

View File

@ -871,7 +871,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
List<LoadQueueItem> toRetry = new ArrayList<>(); List<LoadQueueItem> toRetry = new ArrayList<>();
try { try {
Configuration conf = getConf(); Configuration conf = getConf();
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() byte[] region = RpcRetryingCallerFactory.instantiate(conf, null, null).<byte[]> newCaller()
.callWithRetries(serviceCallable, Integer.MAX_VALUE); .callWithRetries(serviceCallable, Integer.MAX_VALUE);
if (region == null) { if (region == null) {
LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)

View File

@ -123,10 +123,11 @@ public class HConnectionTestingUtility {
NonceGenerator ng = Mockito.mock(NonceGenerator.class); NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getNonceGenerator()).thenReturn(ng);
AsyncProcess asyncProcess = new AsyncProcess(c, conf, AsyncProcess asyncProcess = new AsyncProcess(c, conf,
RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf)); RpcRetryingCallerFactory.instantiate(conf, c.getConnectionMetrics()),
RpcControllerFactory.instantiate(conf));
Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess); Mockito.when(c.getAsyncProcess()).thenReturn(asyncProcess);
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(RpcRetryingCallerFactory
.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); .instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, null));
Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class)); Mockito.when(c.getRpcControllerFactory()).thenReturn(Mockito.mock(RpcControllerFactory.class));
Table t = Mockito.mock(Table.class); Table t = Mockito.mock(Table.class);
Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t); Mockito.when(c.getTable((TableName) Mockito.any())).thenReturn(t);

View File

@ -209,8 +209,8 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row, locations.getRegionLocation(1), locations.getRegionLocation(1).getRegionInfo(), row,
Lists.newArrayList(entry), new AtomicLong()); Lists.newArrayList(entry), new AtomicLong());
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory factory = RpcRetryingCallerFactory
RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); .instantiate(connection.getConfiguration(), connection.getConnectionMetrics());
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
} }
} }