HBASE-17114 Add an option to set special retry pause when encountering CallQueueTooBigException

This commit is contained in:
Yu Li 2016-12-01 13:38:13 +08:00
parent 15fe3d3279
commit 1f3c6f4c07
8 changed files with 147 additions and 17 deletions

View File

@ -210,6 +210,7 @@ class AsyncProcess {
*/ */
protected final int maxConcurrentTasksPerServer; protected final int maxConcurrentTasksPerServer;
protected final long pause; protected final long pause;
protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
protected int numTries; protected int numTries;
protected int serverTrackerTimeout; protected int serverTrackerTimeout;
protected int rpcTimeout; protected int rpcTimeout;
@ -234,6 +235,15 @@ class AsyncProcess {
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE); HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
// how many times we could try in total, one more than retry number // how many times we could try in total, one more than retry number
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;

View File

@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
@ -788,8 +789,15 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
// we go for one. // we go for one.
boolean retryImmediately = throwable instanceof RetryImmediatelyException; boolean retryImmediately = throwable instanceof RetryImmediatelyException;
int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
long backOffTime = retryImmediately ? 0 : long backOffTime;
errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); if (retryImmediately) {
backOffTime = 0;
} else if (throwable instanceof CallQueueTooBigException) {
// Give a special check on CQTBE, see #HBASE-17114
backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE);
} else {
backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause);
}
if (numAttempt > asyncProcess.startLogErrorsCnt) { if (numAttempt > asyncProcess.startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many // We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on // logs, as errors are to be expected when a region moves, splits and so on

View File

@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
@ -114,6 +115,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private final boolean hostnamesCanChange; private final boolean hostnamesCanChange;
private final long pause; private final long pause;
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
private final boolean useMetaReplicas; private final boolean useMetaReplicas;
private final int numTries; private final int numTries;
final int rpcTimeout; final int rpcTimeout;
@ -193,6 +195,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.closed = false; this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE); HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS); HConstants.DEFAULT_USE_META_REPLICAS);
// how many times to try, one more than max *retry* time // how many times to try, one more than max *retry* time
@ -751,6 +762,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
// Query the meta region // Query the meta region
long pauseBase = this.pause;
try { try {
Result regionInfoRow = null; Result regionInfoRow = null;
ReversedClientScanner rcs = null; ReversedClientScanner rcs = null;
@ -825,13 +837,17 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (e instanceof RemoteException) { if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException(); e = ((RemoteException)e).unwrapRemoteException();
} }
if (e instanceof CallQueueTooBigException) {
// Give a special check on CallQueueTooBigException, see #HBASE-17114
pauseBase = this.pauseForCQTBE;
}
if (tries < maxAttempts - 1) { if (tries < maxAttempts - 1) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("locateRegionInMeta parentTable=" + LOG.debug("locateRegionInMeta parentTable=" +
TableName.META_TABLE_NAME + ", metaLocation=" + TableName.META_TABLE_NAME + ", metaLocation=" +
", attempt=" + tries + " of " + ", attempt=" + tries + " of " +
maxAttempts + " failed; retrying after sleep of " + maxAttempts + " failed; retrying after sleep of " +
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage());
} }
} else { } else {
throw e; throw e;
@ -843,7 +859,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
} }
try{ try{
Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException("Giving up trying to location region in " + throw new InterruptedIOException("Giving up trying to location region in " +
"meta: thread is interrupted."); "meta: thread is interrupted.");

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -30,8 +32,10 @@ public class RpcRetryingCallerFactory {
/** Configuration key for a custom {@link RpcRetryingCaller} */ /** Configuration key for a custom {@link RpcRetryingCaller} */
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
private static final Log LOG = LogFactory.getLog(RpcRetryingCallerFactory.class);
protected final Configuration conf; protected final Configuration conf;
private final long pause; private final long pause;
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
private final int retries; private final int retries;
private final int rpcTimeout; private final int rpcTimeout;
private final RetryingCallerInterceptor interceptor; private final RetryingCallerInterceptor interceptor;
@ -48,6 +52,15 @@ public class RpcRetryingCallerFactory {
this.conf = conf; this.conf = conf;
pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE); HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
@ -71,8 +84,8 @@ public class RpcRetryingCallerFactory {
public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
// We store the values in the factory instance. This way, constructing new objects // We store the values in the factory instance. This way, constructing new objects
// is cheap as it does not require parsing a complex structure. // is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor, RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, pauseForCQTBE, retries,
startLogErrorsCnt, rpcTimeout); interceptor, startLogErrorsCnt, rpcTimeout);
return caller; return caller;
} }
@ -82,8 +95,8 @@ public class RpcRetryingCallerFactory {
public <T> RpcRetryingCaller<T> newCaller() { public <T> RpcRetryingCaller<T> newCaller() {
// We store the values in the factory instance. This way, constructing new objects // We store the values in the factory instance. This way, constructing new objects
// is cheap as it does not require parsing a complex structure. // is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, retries, interceptor, RpcRetryingCaller<T> caller = new RpcRetryingCallerImpl<T>(pause, pauseForCQTBE, retries,
startLogErrorsCnt, rpcTimeout); interceptor, startLogErrorsCnt, rpcTimeout);
return caller; return caller;
} }

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
@ -57,6 +58,7 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final int startLogErrorsCnt; private final int startLogErrorsCnt;
private final long pause; private final long pause;
private final long pauseForCQTBE;
private final int maxAttempts;// how many times to try private final int maxAttempts;// how many times to try
private final int rpcTimeout;// timeout for each rpc request private final int rpcTimeout;// timeout for each rpc request
private final AtomicBoolean cancelled = new AtomicBoolean(false); private final AtomicBoolean cancelled = new AtomicBoolean(false);
@ -64,13 +66,15 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
private final RetryingCallerInterceptorContext context; private final RetryingCallerInterceptorContext context;
private final RetryingTimeTracker tracker; private final RetryingTimeTracker tracker;
public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) {
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,
startLogErrorsCnt, 0);
} }
public RpcRetryingCallerImpl(long pause, int retries, public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
this.pause = pause; this.pause = pause;
this.pauseForCQTBE = pauseForCQTBE;
this.maxAttempts = retries2Attempts(retries); this.maxAttempts = retries2Attempts(retries);
this.interceptor = interceptor; this.interceptor = interceptor;
context = interceptor.createEmptyContext(); context = interceptor.createEmptyContext();
@ -126,9 +130,11 @@ public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
throw new RetriesExhaustedException(tries, exceptions); throw new RetriesExhaustedException(tries, exceptions);
} }
// If the server is dead, we need to wait a little before retrying, to give // If the server is dead, we need to wait a little before retrying, to give
// a chance to the regions to be // a chance to the regions to be moved
// get right pause time, start by RETRY_BACKOFF[0] * pause // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
expectedSleep = callable.sleep(pause, tries); // special when encountering CallQueueTooBigException, see #HBASE-17114
long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause;
expectedSleep = callable.sleep(pauseBase, tries);
// If, after the planned sleep, there won't be enough time left, we stop now. // If, after the planned sleep, there won't be enough time left, we stop now.
long duration = singleCallDuration(expectedSleep); long duration = singleCallDuration(expectedSleep);

View File

@ -236,7 +236,7 @@ public class TestAsyncProcess {
} }
}); });
return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) { return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
@Override @Override
public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) int callTimeout)
@ -280,7 +280,7 @@ public class TestAsyncProcess {
private final IOException e; private final IOException e;
public CallerWithFailure(IOException e) { public CallerWithFailure(IOException e) {
super(100, 100, 9); super(100, 500, 100, 9);
this.e = e; this.e = e;
} }
@ -386,7 +386,7 @@ public class TestAsyncProcess {
replicaCalls.incrementAndGet(); replicaCalls.incrementAndGet();
} }
return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) { return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 10, 9) {
@Override @Override
public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable, public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout) int callTimeout)
@ -1730,4 +1730,68 @@ public class TestAsyncProcess {
} }
t.join(); t.join();
} }
/**
* Test and make sure we could use a special pause setting when retry with
* CallQueueTooBigException, see HBASE-17114
* @throws Exception if unexpected error happened during test
*/
@Test
public void testRetryPauseWithCallQueueTooBigException() throws Exception {
Configuration myConf = new Configuration(conf);
final long specialPause = 500L;
final int retries = 1;
myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
ClusterConnection conn = new MyConnectionImpl(myConf);
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
AsyncProcessWithFailure ap =
new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
mutator.ap = ap;
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
long startTime = System.currentTimeMillis();
try {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
long actualSleep = System.currentTimeMillis() - startTime;
long expectedSleep = 0L;
for (int i = 0; i < retries; i++) {
expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
// Prevent jitter in CollectionUtils#getPauseTime to affect result
actualSleep += (long) (specialPause * 0.01f);
}
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
actualSleep >= expectedSleep);
// check and confirm normal IOE will use the normal pause
final long normalPause =
myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
mutator.ap = ap;
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
mutator.mutate(p);
startTime = System.currentTimeMillis();
try {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
actualSleep = System.currentTimeMillis() - startTime;
expectedSleep = 0L;
for (int i = 0; i < retries; i++) {
expectedSleep += ConnectionUtils.getPauseTime(normalPause, i);
}
// plus an additional pause to balance the program execution time
expectedSleep += normalPause;
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
}
} }

View File

@ -728,6 +728,11 @@ public final class HConstants {
*/ */
public static final long DEFAULT_HBASE_CLIENT_PAUSE = 100; public static final long DEFAULT_HBASE_CLIENT_PAUSE = 100;
/**
* Parameter name for client pause value for special case such as call queue too big, etc.
*/
public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe";
/** /**
* The maximum number of concurrent connections the client will maintain. * The maximum number of concurrent connections the client will maintain.
*/ */

View File

@ -457,6 +457,14 @@ possible configurations would overwhelm and obscure the important.
See hbase.client.retries.number for description of how we backoff from See hbase.client.retries.number for description of how we backoff from
this initial pause amount and how this pause works w/ retries.</description> this initial pause amount and how this pause works w/ retries.</description>
</property> </property>
<property>
<name>hbase.client.pause.cqtbe</name>
<value></value>
<description>Whether or not to use a special client pause for
CallQueueTooBigException (cqtbe). Set this property to a higher value
than hbase.client.pause if you observe frequent CQTBE from the same
RegionServer and the call queue there keeps full</description>
</property>
<property> <property>
<name>hbase.client.retries.number</name> <name>hbase.client.retries.number</name>
<value>35</value> <value>35</value>