HBASE-12432 RpcRetryingCaller should log after fixed number of retries like AsyncProcess
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
440767ff55
commit
fb1af86ee1
|
@ -96,6 +96,16 @@ class AsyncProcess {
|
||||||
|
|
||||||
public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
|
public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure the number of failures after which the client will start logging. A few failures
|
||||||
|
* is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
|
||||||
|
* heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
|
||||||
|
* this stage.
|
||||||
|
*/
|
||||||
|
public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
|
||||||
|
"hbase.client.start.log.errors.counter";
|
||||||
|
public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The context used to wait for results from one submit call.
|
* The context used to wait for results from one submit call.
|
||||||
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
|
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
|
||||||
|
@ -255,10 +265,8 @@ class AsyncProcess {
|
||||||
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
|
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
|
||||||
|
|
||||||
// A few failure is fine: region moved, then is not opened, then is overloaded. We try
|
this.startLogErrorsCnt =
|
||||||
// to have an acceptable heuristic for the number of errors we don't log.
|
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||||
// 9 was chosen because we wait for 1s at this stage.
|
|
||||||
this.startLogErrorsCnt = conf.getInt("hbase.client.start.log.errors.counter", 9);
|
|
||||||
|
|
||||||
if (this.maxTotalConcurrentTasks <= 0) {
|
if (this.maxTotalConcurrentTasks <= 0) {
|
||||||
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
|
||||||
|
|
|
@ -58,6 +58,8 @@ public class RpcRetryingCaller<T> {
|
||||||
* Start and end times for a single call.
|
* Start and end times for a single call.
|
||||||
*/
|
*/
|
||||||
private final static int MIN_RPC_TIMEOUT = 2000;
|
private final static int MIN_RPC_TIMEOUT = 2000;
|
||||||
|
/** How many retries are allowed before we start to log */
|
||||||
|
private final int startLogErrorsCnt;
|
||||||
|
|
||||||
private final long pause;
|
private final long pause;
|
||||||
private final int retries;
|
private final int retries;
|
||||||
|
@ -65,16 +67,17 @@ public class RpcRetryingCaller<T> {
|
||||||
private final RetryingCallerInterceptor interceptor;
|
private final RetryingCallerInterceptor interceptor;
|
||||||
private final RetryingCallerInterceptorContext context;
|
private final RetryingCallerInterceptorContext context;
|
||||||
|
|
||||||
public RpcRetryingCaller(long pause, int retries) {
|
public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
|
||||||
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
|
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RpcRetryingCaller(long pause, int retries,
|
public RpcRetryingCaller(long pause, int retries,
|
||||||
RetryingCallerInterceptor interceptor) {
|
RetryingCallerInterceptor interceptor, int startLogErrorsCnt) {
|
||||||
this.pause = pause;
|
this.pause = pause;
|
||||||
this.retries = retries;
|
this.retries = retries;
|
||||||
this.interceptor = interceptor;
|
this.interceptor = interceptor;
|
||||||
context = interceptor.createEmptyContext();
|
context = interceptor.createEmptyContext();
|
||||||
|
this.startLogErrorsCnt = startLogErrorsCnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getRemainingTime(int callTimeout) {
|
private int getRemainingTime(int callTimeout) {
|
||||||
|
@ -125,10 +128,11 @@ public class RpcRetryingCaller<T> {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
ExceptionUtil.rethrowIfInterrupt(t);
|
ExceptionUtil.rethrowIfInterrupt(t);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (tries > startLogErrorsCnt) {
|
||||||
LOG.trace("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
|
LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
|
||||||
(EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
|
(EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
|
||||||
+ "cancelled=" + cancelled.get(), t);
|
+ "cancelled=" + cancelled.get() + ", msg="
|
||||||
|
+ callable.getExceptionMessageAdditionalDetail());
|
||||||
}
|
}
|
||||||
|
|
||||||
// translateException throws exception when should not retry: i.e. when request is bad.
|
// translateException throws exception when should not retry: i.e. when request is bad.
|
||||||
|
|
|
@ -32,6 +32,7 @@ public class RpcRetryingCallerFactory {
|
||||||
private final long pause;
|
private final long pause;
|
||||||
private final int retries;
|
private final int retries;
|
||||||
private final RetryingCallerInterceptor interceptor;
|
private final RetryingCallerInterceptor interceptor;
|
||||||
|
private final int startLogErrorsCnt;
|
||||||
|
|
||||||
public RpcRetryingCallerFactory(Configuration conf) {
|
public RpcRetryingCallerFactory(Configuration conf) {
|
||||||
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
|
this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
|
||||||
|
@ -43,13 +44,15 @@ public class RpcRetryingCallerFactory {
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||||
retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||||
|
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
|
||||||
|
AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
|
||||||
this.interceptor = interceptor;
|
this.interceptor = interceptor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> RpcRetryingCaller<T> newCaller() {
|
public <T> RpcRetryingCaller<T> newCaller() {
|
||||||
// We store the values in the factory instance. This way, constructing new objects
|
// We store the values in the factory instance. This way, constructing new objects
|
||||||
// is cheap as it does not require parsing a complex structure.
|
// is cheap as it does not require parsing a complex structure.
|
||||||
return new RpcRetryingCaller<T>(pause, retries, interceptor);
|
return new RpcRetryingCaller<T>(pause, retries, interceptor, startLogErrorsCnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
|
public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class TestAsyncProcess {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
return new RpcRetryingCaller<MultiResponse>(100, 10) {
|
return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
|
||||||
@Override
|
@Override
|
||||||
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
|
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
|
||||||
int callTimeout)
|
int callTimeout)
|
||||||
|
@ -211,7 +211,7 @@ public class TestAsyncProcess {
|
||||||
static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
|
static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
|
||||||
|
|
||||||
public CallerWithFailure() {
|
public CallerWithFailure() {
|
||||||
super(100, 100);
|
super(100, 100, 9);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -294,7 +294,7 @@ public class TestAsyncProcess {
|
||||||
replicaCalls.incrementAndGet();
|
replicaCalls.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
return new RpcRetryingCaller<MultiResponse>(100, 10) {
|
return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
|
||||||
@Override
|
@Override
|
||||||
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
|
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
|
||||||
throws IOException, RuntimeException {
|
throws IOException, RuntimeException {
|
||||||
|
|
|
@ -564,7 +564,7 @@ public class TestFastFailWithoutTestUtil {
|
||||||
|
|
||||||
public RpcRetryingCaller<Void> getRpcRetryingCaller(int pauseTime,
|
public RpcRetryingCaller<Void> getRpcRetryingCaller(int pauseTime,
|
||||||
int retries, RetryingCallerInterceptor interceptor) {
|
int retries, RetryingCallerInterceptor interceptor) {
|
||||||
return new RpcRetryingCaller<Void>(pauseTime, retries, interceptor) {
|
return new RpcRetryingCaller<Void>(pauseTime, retries, interceptor, 9) {
|
||||||
@Override
|
@Override
|
||||||
public Void callWithRetries(RetryingCallable<Void> callable,
|
public Void callWithRetries(RetryingCallable<Void> callable,
|
||||||
int callTimeout) throws IOException, RuntimeException {
|
int callTimeout) throws IOException, RuntimeException {
|
||||||
|
|
Loading…
Reference in New Issue