HBASE-17114 Add an option to set special retry pause when encountering CallQueueTooBigException

This commit is contained in:
Yu Li 2016-12-01 14:45:12 +08:00
parent c8ea82299c
commit 4068a8b4d4
8 changed files with 149 additions and 19 deletions

View File

@ -47,6 +47,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -257,6 +258,7 @@ class AsyncProcess {
*/
protected final int maxConcurrentTasksPerServer;
protected final long pause;
protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
protected int numTries;
protected int serverTrackerTimeout;
protected int rpcTimeout;
@ -321,6 +323,15 @@ class AsyncProcess {
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.rpcTimeout = rpcTimeout;
@ -1313,8 +1324,15 @@ class AsyncProcess {
// we go for one.
boolean retryImmediately = throwable instanceof RetryImmediatelyException;
int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
long backOffTime = retryImmediately ? 0 :
errorsByServer.calculateBackoffTime(oldServer, pause);
long backOffTime;
if (retryImmediately) {
backOffTime = 0;
} else if (throwable instanceof CallQueueTooBigException) {
// Give a special check on CQTBE, see #HBASE-17114
backOffTime = errorsByServer.calculateBackoffTime(oldServer, pauseForCQTBE);
} else {
backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
}
if (numAttempt > startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on

View File

@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@ -557,6 +558,7 @@ class ConnectionManager {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
private final boolean hostnamesCanChange;
private final long pause;
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
private final boolean useMetaReplicas;
private final int numTries;
final int rpcTimeout;
@ -649,6 +651,15 @@ class ConnectionManager {
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS);
this.numTries = connectionConfig.getRetriesNumber();
@ -1272,6 +1283,7 @@ class ConnectionManager {
}
// Query the meta region
long pauseBase = this.pause;
try {
Result regionInfoRow = null;
ReversedClientScanner rcs = null;
@ -1346,13 +1358,17 @@ class ConnectionManager {
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
if (e instanceof CallQueueTooBigException) {
// Give a special check on CallQueueTooBigException, see #HBASE-17114
pauseBase = this.pauseForCQTBE;
}
if (tries < localNumRetries - 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("locateRegionInMeta parentTable=" +
TableName.META_TABLE_NAME + ", metaLocation=" +
", attempt=" + tries + " of " +
localNumRetries + " failed; retrying after sleep of " +
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage());
}
} else {
throw e;
@ -1364,7 +1380,7 @@ class ConnectionManager {
}
}
try{
Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
} catch (InterruptedException e) {
throw new InterruptedIOException("Giving up trying to location region in " +
"meta: thread is interrupted.");

View File

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -62,19 +63,22 @@ public class RpcRetryingCaller<T> {
private final int startLogErrorsCnt;
private final long pause;
private final long pauseForCQTBE;
private final int retries;
private final int rpcTimeout;// timeout for each rpc request
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final RetryingCallerInterceptor interceptor;
private final RetryingCallerInterceptorContext context;
public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) {
this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0);
public RpcRetryingCaller(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) {
this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,
startLogErrorsCnt, 0);
}
public RpcRetryingCaller(long pause, int retries,
public RpcRetryingCaller(long pause, long pauseForCQTBE, int retries,
RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) {
this.pause = pause;
this.pauseForCQTBE = pauseForCQTBE;
this.retries = retries;
this.interceptor = interceptor;
context = interceptor.createEmptyContext();
@ -159,9 +163,11 @@ public class RpcRetryingCaller<T> {
throw new RetriesExhaustedException(tries, exceptions);
}
// If the server is dead, we need to wait a little before retrying, to give
// a chance to the regions to be
// get right pause time, start by RETRY_BACKOFF[0] * pause
expectedSleep = callable.sleep(pause, tries);
// a chance to the regions to be moved
// get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
// special when encountering CallQueueTooBigException, see #HBASE-17114
long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause;
expectedSleep = callable.sleep(pauseBase, tries);
// If, after the planned sleep, there won't be enough time left, we stop now.
long duration = singleCallDuration(expectedSleep);

View File

@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@ -30,8 +32,10 @@ public class RpcRetryingCallerFactory {
/** Configuration key for a custom {@link RpcRetryingCaller} */
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
private static final Log LOG = LogFactory.getLog(RpcRetryingCallerFactory.class);
protected final Configuration conf;
private final long pause;
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
private final int retries;
private final int rpcTimeout;
private final RetryingCallerInterceptor interceptor;
@ -47,6 +51,15 @@ public class RpcRetryingCallerFactory {
this.conf = conf;
pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY,
@ -70,8 +83,8 @@ public class RpcRetryingCallerFactory {
public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
// We store the values in the factory instance. This way, constructing new objects
// is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
startLogErrorsCnt, rpcTimeout);
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, pauseForCQTBE, retries,
interceptor, startLogErrorsCnt, rpcTimeout);
return caller;
}
@ -81,8 +94,8 @@ public class RpcRetryingCallerFactory {
public <T> RpcRetryingCaller<T> newCaller() {
// We store the values in the factory instance. This way, constructing new objects
// is cheap as it does not require parsing a complex structure.
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, interceptor,
startLogErrorsCnt, rpcTimeout);
RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, pauseForCQTBE, retries,
interceptor, startLogErrorsCnt, rpcTimeout);
return caller;
}

View File

@ -238,7 +238,7 @@ public class TestAsyncProcess {
}
});
return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
return new RpcRetryingCaller<MultiResponse>(100, 500, 10, 9) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
int callTimeout)
@ -261,7 +261,7 @@ public class TestAsyncProcess {
private final IOException e;
public CallerWithFailure(IOException e) {
super(100, 100, 9);
super(100, 500, 100, 9);
this.e = e;
}
@ -366,7 +366,7 @@ public class TestAsyncProcess {
replicaCalls.incrementAndGet();
}
return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
return new RpcRetryingCaller<MultiResponse>(100, 500, 10, 9) {
@Override
public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
int callTimeout)
@ -1695,4 +1695,68 @@ public class TestAsyncProcess {
}
t.join();
}
/**
* Test and make sure we could use a special pause setting when retry with
* CallQueueTooBigException, see HBASE-17114
* @throws Exception if unexpected error happened during test
*/
@Test
public void testRetryPauseWithCallQueueTooBigException() throws Exception {
Configuration myConf = new Configuration(conf);
final long specialPause = 500L;
final int tries = 2;
myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, tries);
ClusterConnection conn = new MyConnectionImpl(myConf);
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
AsyncProcessWithFailure ap =
new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
mutator.ap = ap;
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
long startTime = System.currentTimeMillis();
try {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
long actualSleep = System.currentTimeMillis() - startTime;
long expectedSleep = 0L;
for (int i = 0; i < tries - 1; i++) {
expectedSleep += ConnectionUtils.getPauseTime(specialPause, i);
// Prevent jitter in CollectionUtils#getPauseTime to affect result
actualSleep += (long) (specialPause * 0.01f);
}
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms",
actualSleep >= expectedSleep);
// check and confirm normal IOE will use the normal pause
final long normalPause =
myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
mutator.ap = ap;
Assert.assertNotNull(mutator.ap.createServerErrorTracker());
mutator.mutate(p);
startTime = System.currentTimeMillis();
try {
mutator.flush();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
actualSleep = System.currentTimeMillis() - startTime;
expectedSleep = 0L;
for (int i = 0; i < tries - 1; i++) {
expectedSleep += ConnectionUtils.getPauseTime(normalPause, i);
}
// plus an additional pause to balance the program execution time
expectedSleep += normalPause;
LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms");
Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep);
}
}

View File

@ -532,7 +532,7 @@ public class TestClientScanner {
@Override
public <T> RpcRetryingCaller<T> newCaller() {
return new RpcRetryingCaller<T>(0, 0, 0) {
return new RpcRetryingCaller<T>(0, 0, 0, 0) {
@Override
public void cancel() {
}

View File

@ -713,6 +713,11 @@ public final class HConstants {
*/
public static final long DEFAULT_HBASE_CLIENT_PAUSE = 100;
/*
* Parameter name for client pause value for special case such as call queue too big, etc.
*/
public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe";
/**
* The maximum number of concurrent connections the client will maintain.
*/

View File

@ -480,6 +480,14 @@ possible configurations would overwhelm and obscure the important.
See hbase.client.retries.number for description of how we backoff from
this initial pause amount and how this pause works w/ retries.</description>
</property>
<property>
<name>hbase.client.pause.cqtbe</name>
<value></value>
<description>Whether or not to use a special client pause for
CallQueueTooBigException (cqtbe). Set this property to a higher value
than hbase.client.pause if you observe frequent CQTBE from the same
RegionServer and the call queue there keeps full</description>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>35</value>