diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index e653c80ed52..868295dfe01 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -210,6 +210,7 @@ class AsyncProcess { */ protected final int maxConcurrentTasksPerServer; protected final long pause; + protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified protected int numTries; protected int serverTrackerTimeout; protected int rpcTimeout; @@ -234,6 +235,15 @@ class AsyncProcess { this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); + if (configuredPauseForCQTBE < pause) { + LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " + + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE + + ", will use " + pause + " instead."); + this.pauseForCQTBE = pause; + } else { + this.pauseForCQTBE = configuredPauseForCQTBE; + } // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 6b6b99aa4f5..d176ce1d8e2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -788,8 +789,15 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { // we go for one. boolean retryImmediately = throwable instanceof RetryImmediatelyException; int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1; - long backOffTime = retryImmediately ? 0 : - errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); + long backOffTime; + if (retryImmediately) { + backOffTime = 0; + } else if (throwable instanceof CallQueueTooBigException) { + // Give a special check on CQTBE, see #HBASE-17114 + backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE); + } else { + backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause); + } if (numAttempt > asyncProcess.startLogErrorsCnt) { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 11349230a19..96452f9cd8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -114,6 +115,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final boolean hostnamesCanChange; private final long pause; + private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified private final boolean useMetaReplicas; private final int numTries; final int rpcTimeout; @@ -193,6 +195,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.closed = false; this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); + if (configuredPauseForCQTBE < pause) { + LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " + + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE + + ", will use " + pause + " instead."); + this.pauseForCQTBE = pause; + } else { + this.pauseForCQTBE = configuredPauseForCQTBE; + } this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); // how many times to try, one more than max *retry* time @@ -751,6 +762,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } // Query the meta region + long pauseBase = this.pause; try { Result regionInfoRow = null; ReversedClientScanner rcs = null; @@ -825,13 +837,17 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (e instanceof RemoteException) { e = ((RemoteException)e).unwrapRemoteException(); } + if (e instanceof CallQueueTooBigException) { + // Give a special check on CallQueueTooBigException, see #HBASE-17114 + pauseBase = this.pauseForCQTBE; + } if (tries < maxAttempts - 1) { if (LOG.isDebugEnabled()) { LOG.debug("locateRegionInMeta parentTable=" + TableName.META_TABLE_NAME + ", metaLocation=" + ", attempt=" + tries + " of " + maxAttempts + " failed; retrying after sleep of " + - ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage()); + ConnectionUtils.getPauseTime(pauseBase, tries) + " because: " + e.getMessage()); } } else { throw e; @@ -843,7 +859,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } } try{ - Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries)); + Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); } catch (InterruptedException e) { throw new InterruptedIOException("Giving up trying to location region in " + "meta: thread is interrupted."); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index f92aeae0df7..cc8c23ae73c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -30,8 +32,10 @@ public class RpcRetryingCallerFactory { /** Configuration key for a custom {@link RpcRetryingCaller} */ public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class"; + private static final Log LOG = LogFactory.getLog(RpcRetryingCallerFactory.class); protected final Configuration conf; private final long pause; + private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified private final int retries; private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; @@ -48,6 +52,15 @@ public class RpcRetryingCallerFactory { this.conf = conf; pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); + if (configuredPauseForCQTBE < pause) { + LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " + + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE + + ", will use " + pause + " instead."); + this.pauseForCQTBE = pause; + } else { + this.pauseForCQTBE = configuredPauseForCQTBE; + } retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, @@ -71,8 +84,8 @@ public class RpcRetryingCallerFactory { public RpcRetryingCaller newCaller(int rpcTimeout) { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, - startLogErrorsCnt, rpcTimeout); + RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, pauseForCQTBE, retries, + interceptor, startLogErrorsCnt, rpcTimeout); return caller; } @@ -82,8 +95,8 @@ public class RpcRetryingCallerFactory { public RpcRetryingCaller newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. - RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, retries, interceptor, - startLogErrorsCnt, rpcTimeout); + RpcRetryingCaller caller = new RpcRetryingCallerImpl(pause, pauseForCQTBE, retries, + interceptor, startLogErrorsCnt, rpcTimeout); return caller; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index 91a20ecbb44..6450adfc58d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; @@ -57,6 +58,7 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final int startLogErrorsCnt; private final long pause; + private final long pauseForCQTBE; private final int maxAttempts;// how many times to try private final int rpcTimeout;// timeout for each rpc request private final AtomicBoolean cancelled = new AtomicBoolean(false); @@ -64,13 +66,15 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { private final RetryingCallerInterceptorContext context; private final RetryingTimeTracker tracker; - public RpcRetryingCallerImpl(long pause, int retries, int startLogErrorsCnt) { - this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, startLogErrorsCnt, 0); + public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, int startLogErrorsCnt) { + this(pause, pauseForCQTBE, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, + startLogErrorsCnt, 0); } - public RpcRetryingCallerImpl(long pause, int retries, + public RpcRetryingCallerImpl(long pause, long pauseForCQTBE, int retries, RetryingCallerInterceptor interceptor, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; + this.pauseForCQTBE = pauseForCQTBE; this.maxAttempts = retries2Attempts(retries); this.interceptor = interceptor; context = interceptor.createEmptyContext(); @@ -126,9 +130,11 @@ public class RpcRetryingCallerImpl implements RpcRetryingCaller { throw new RetriesExhaustedException(tries, exceptions); } // If the server is dead, we need to wait a little before retrying, to give - // a chance to the regions to be - // get right pause time, start by RETRY_BACKOFF[0] * pause - expectedSleep = callable.sleep(pause, tries); + // a chance to the regions to be moved + // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be + // special when encountering CallQueueTooBigException, see #HBASE-17114 + long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause; + expectedSleep = callable.sleep(pauseBase, tries); // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 5a2169944f3..bb6cbb55c00 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -236,7 +236,7 @@ public class TestAsyncProcess { } }); - return new RpcRetryingCallerImpl(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, 9) { @Override public AbstractResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -280,7 +280,7 @@ public class TestAsyncProcess { private final IOException e; public CallerWithFailure(IOException e) { - super(100, 100, 9); + super(100, 500, 100, 9); this.e = e; } @@ -386,7 +386,7 @@ public class TestAsyncProcess { replicaCalls.incrementAndGet(); } - return new RpcRetryingCallerImpl(100, 10, 9) { + return new RpcRetryingCallerImpl(100, 500, 10, 9) { @Override public MultiResponse callWithoutRetries(RetryingCallable callable, int callTimeout) @@ -1730,4 +1730,68 @@ public class TestAsyncProcess { } t.join(); } + + /** + * Test and make sure we could use a special pause setting when retry with + * CallQueueTooBigException, see HBASE-17114 + * @throws Exception if unexpected error happened during test + */ + @Test + public void testRetryPauseWithCallQueueTooBigException() throws Exception { + Configuration myConf = new Configuration(conf); + final long specialPause = 500L; + final int retries = 1; + myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause); + myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries); + ClusterConnection conn = new MyConnectionImpl(myConf); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); + AsyncProcessWithFailure ap = + new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException()); + mutator.ap = ap; + + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + + Put p = createPut(1, true); + mutator.mutate(p); + + long startTime = System.currentTimeMillis(); + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + long actualSleep = System.currentTimeMillis() - startTime; + long expectedSleep = 0L; + for (int i = 0; i < retries; i++) { + expectedSleep += ConnectionUtils.getPauseTime(specialPause, i); + // Prevent jitter in CollectionUtils#getPauseTime to affect result + actualSleep += (long) (specialPause * 0.01f); + } + LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); + Assert.assertTrue("Expected to sleep " + expectedSleep + " but actually " + actualSleep + "ms", + actualSleep >= expectedSleep); + + // check and confirm normal IOE will use the normal pause + final long normalPause = + myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + ap = new AsyncProcessWithFailure(conn, myConf, new IOException()); + mutator.ap = ap; + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + mutator.mutate(p); + startTime = System.currentTimeMillis(); + try { + mutator.flush(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + actualSleep = System.currentTimeMillis() - startTime; + expectedSleep = 0L; + for (int i = 0; i < retries; i++) { + expectedSleep += ConnectionUtils.getPauseTime(normalPause, i); + } + // plus an additional pause to balance the program execution time + expectedSleep += normalPause; + LOG.debug("Expected to sleep " + expectedSleep + "ms, actually slept " + actualSleep + "ms"); + Assert.assertTrue("Slept for too long: " + actualSleep + "ms", actualSleep <= expectedSleep); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 08be194ab81..48d97780249 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -728,6 +728,11 @@ public final class HConstants { */ public static final long DEFAULT_HBASE_CLIENT_PAUSE = 100; + /** + * Parameter name for client pause value for special case such as call queue too big, etc. + */ + public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe"; + /** * The maximum number of concurrent connections the client will maintain. */ diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index d8baa00fd9a..dfa327022c8 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -457,6 +457,14 @@ possible configurations would overwhelm and obscure the important. See hbase.client.retries.number for description of how we backoff from this initial pause amount and how this pause works w/ retries. + + hbase.client.pause.cqtbe + + Whether or not to use a special client pause for + CallQueueTooBigException (cqtbe). Set this property to a higher value + than hbase.client.pause if you observe frequent CQTBE from the same + RegionServer and the call queue there keeps full + hbase.client.retries.number 35