From 46cc3d4972d8db478c86e9848afed523cb6dc866 Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Mon, 7 Mar 2016 20:53:52 -0800 Subject: [PATCH] HBASE-15137 CallTimeoutException and CallQueueTooBigException should trigger PFFE --- .../client/FastFailInterceptorContext.java | 15 ++- .../client/PreemptiveFastFailInterceptor.java | 81 +++----------- .../exceptions/ClientExceptionsUtil.java | 86 ++++++++++++++- .../PreemptiveFastFailException.java | 104 ++++++++++++------ .../hadoop/hbase/client/TestFastFail.java | 62 +++++++++++ 5 files changed, 249 insertions(+), 99 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java index 9eb56bca101..3cbdfb3f7a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java @@ -27,8 +27,10 @@ class FastFailInterceptorContext extends // The variable that indicates whether we were able to connect with the server // in the last run - private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean( - false); + private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false); + + // If set, we guarantee that no modifications went to server + private MutableBoolean guaranteedClientSideOnly = new MutableBoolean(false); // The variable which indicates whether this was a retry or the first time private boolean didTry = false; @@ -53,6 +55,10 @@ class FastFailInterceptorContext extends return couldNotCommunicateWithServer; } + public MutableBoolean getGuaranteedClientSideOnly() { + return guaranteedClientSideOnly; + } + public FailureInfo getFailureInfo() { return fInfo; } @@ -78,6 +84,10 @@ class FastFailInterceptorContext extends this.couldNotCommunicateWithServer = couldNotCommunicateWithServer; } + public void setGuaranteedClientSideOnly(MutableBoolean guaranteedClientSideOnly) { + this.guaranteedClientSideOnly = guaranteedClientSideOnly; + } + public void setDidTry(boolean didTry) { this.didTry = didTry; } @@ -103,6 +113,7 @@ class FastFailInterceptorContext extends fInfo = null; didTry = false; couldNotCommunicateWithServer.setValue(false); + guaranteedClientSideOnly.setValue(false); retryDespiteFastFailMode = false; tries = 0; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index 64cd03dfab6..c87d6c7e0e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -17,32 +17,26 @@ */ package org.apache.hadoop.hbase.client; -import java.io.EOFException; import java.io.IOException; -import java.io.SyncFailedException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; -import java.nio.channels.ClosedChannelException; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeoutException; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; -import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; +import com.google.common.annotations.VisibleForTesting; + /** * * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail @@ -124,7 +118,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { throw new PreemptiveFastFailException( context.getFailureInfo().numConsecutiveFailures.get(), context.getFailureInfo().timeOfFirstFailureMilliSec, - context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer()); + context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(), + context.getGuaranteedClientSideOnly().isTrue()); } } context.setDidTry(true); @@ -133,7 +128,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { public void handleFailure(FastFailInterceptorContext context, Throwable t) throws IOException { handleThrowable(t, context.getServer(), - context.getCouldNotCommunicateWithServer()); + context.getCouldNotCommunicateWithServer(), + context.getGuaranteedClientSideOnly()); } public void updateFailureInfo(FastFailInterceptorContext context) { @@ -153,7 +149,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { * - the throwable to be handled. * @throws PreemptiveFastFailException */ - private void handleFailureToServer(ServerName serverName, Throwable t) { + @VisibleForTesting + protected void handleFailureToServer(ServerName serverName, Throwable t) { if (serverName == null || t == null) { return; } @@ -172,63 +169,19 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { } public void handleThrowable(Throwable t1, ServerName serverName, - MutableBoolean couldNotCommunicateWithServer) throws IOException { - Throwable t2 = translateException(t1); + MutableBoolean couldNotCommunicateWithServer, + MutableBoolean guaranteedClientSideOnly) throws IOException { + Throwable t2 = ClientExceptionsUtil.translatePFFE(t1); boolean isLocalException = !(t2 instanceof RemoteException); - if (isLocalException && isConnectionException(t2)) { + + if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2)) || + ClientExceptionsUtil.isCallQueueTooBigException(t2)) { couldNotCommunicateWithServer.setValue(true); + guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException)); handleFailureToServer(serverName, t2); } } - private Throwable translateException(Throwable t) throws IOException { - if (t instanceof NoSuchMethodError) { - // We probably can't recover from this exception by retrying. - LOG.error(t); - throw (NoSuchMethodError) t; - } - - if (t instanceof NullPointerException) { - // The same here. This is probably a bug. - LOG.error(t.getMessage(), t); - throw (NullPointerException) t; - } - - if (t instanceof UndeclaredThrowableException) { - t = t.getCause(); - } - if (t instanceof RemoteException) { - t = ((RemoteException) t).unwrapRemoteException(); - } - if (t instanceof DoNotRetryIOException) { - throw (DoNotRetryIOException) t; - } - if (t instanceof Error) { - throw (Error) t; - } - return t; - } - - /** - * Check if the exception is something that indicates that we cannot - * contact/communicate with the server. - * - * @param e - * @return true when exception indicates that the client wasn't able to make contact with server - */ - private boolean isConnectionException(Throwable e) { - if (e == null) - return false; - // This list covers most connectivity exceptions but not all. - // For example, in SocketOutputStream a plain IOException is thrown - // at times when the channel is closed. - return (e instanceof SocketTimeoutException - || e instanceof ConnectException || e instanceof ClosedChannelException - || e instanceof SyncFailedException || e instanceof EOFException - || e instanceof TimeoutException - || e instanceof ConnectionClosingException || e instanceof FailedServerException); - } - /** * Occasionally cleans up unused information in repeatedFailuresMap. * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index 1d6f5d674b1..f7224d5c666 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -19,13 +19,26 @@ package org.apache.hadoop.hbase.exceptions; +import java.io.EOFException; +import java.io.IOException; +import java.io.SyncFailedException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedChannelException; +import java.util.concurrent.TimeoutException; + import org.apache.hadoop.hbase.CallQueueTooBigException; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; +import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; import org.apache.hadoop.hbase.quotas.ThrottlingException; import org.apache.hadoop.ipc.RemoteException; @@ -49,7 +62,7 @@ public final class ClientExceptionsUtil { return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException || cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException - || cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException); + || isCallQueueTooBigException(cur) || cur instanceof NotServingRegionException); } @@ -94,4 +107,75 @@ public final class ClientExceptionsUtil { return null; } + + /** + * Checks if the exception is CallQueueTooBig exception, or tries to unwrap + * {@link RemoteWithExtrasException} to see if we've got {@link CallQueueTooBigException}. + * @param t exception to check + * @return true if it's a CQTBE, false otherwise + */ + public static boolean isCallQueueTooBigException(Throwable t) { + if (t instanceof CallQueueTooBigException) { + return true; + } + if (t instanceof RemoteWithExtrasException) { + return CallQueueTooBigException.class.getName().equals( + ((RemoteWithExtrasException) t).getClassName().trim()); + } else { + return false; + } + } + + /** + * Check if the exception is something that indicates that we cannot + * contact/communicate with the server. + * + * @param e exception to check + * @return true when exception indicates that the client wasn't able to make contact with server + */ + public static boolean isConnectionException(Throwable e) { + if (e == null) { + return false; + } + // This list covers most connectivity exceptions but not all. + // For example, in SocketOutputStream a plain IOException is thrown + // at times when the channel is closed. + return (e instanceof SocketTimeoutException || e instanceof ConnectException + || e instanceof ClosedChannelException || e instanceof SyncFailedException + || e instanceof EOFException || e instanceof TimeoutException + || e instanceof CallTimeoutException || e instanceof ConnectionClosingException + || e instanceof FailedServerException); + } + + /** + * Translates exception for preemptive fast fail checks. + * @param t exception to check + * @return translated exception + * @throws IOException + */ + public static Throwable translatePFFE(Throwable t) throws IOException { + if (t instanceof NoSuchMethodError) { + // We probably can't recover from this exception by retrying. + throw (NoSuchMethodError) t; + } + + if (t instanceof NullPointerException) { + // The same here. This is probably a bug. + throw (NullPointerException) t; + } + + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } + if (t instanceof RemoteException) { + t = ((RemoteException) t).unwrapRemoteException(); + } + if (t instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) t; + } + if (t instanceof Error) { + throw (Error) t; + } + return t; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java index 6ca1d886f92..b31e055837d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java @@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.exceptions; import java.net.ConnectException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.ServerName; /** * Thrown when the client believes that we are trying to communicate to has @@ -32,39 +32,79 @@ import org.apache.hadoop.hbase.ServerName; * On receiving such an exception. The ConnectionManager will skip all * retries and fast fail the operation. */ - @InterfaceAudience.Public - @InterfaceStability.Evolving - public class PreemptiveFastFailException extends ConnectException { - private static final long serialVersionUID = 7129103682617007177L; - private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec; +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class PreemptiveFastFailException extends ConnectException { + private static final long serialVersionUID = 7129103682617007177L; + private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec; - /** - * @param count - * @param timeOfFirstFailureMilliSec - * @param timeOfLatestAttemptMilliSec - * @param serverName - */ - public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec, - long timeOfLatestAttemptMilliSec, ServerName serverName) { - super("Exception happened " + count + " times. to" + serverName); - this.failureCount = count; - this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec; - this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec; - } + // If set, we guarantee that no modifications went to server + private boolean guaranteedClientSideOnly; - public long getFirstFailureAt() { - return timeOfFirstFailureMilliSec; - } + /** + * @param count num of consecutive failures + * @param timeOfFirstFailureMilliSec when first failure happened + * @param timeOfLatestAttemptMilliSec when last attempt happened + * @param serverName server we failed to connect to + */ + public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec, + long timeOfLatestAttemptMilliSec, ServerName serverName) { + super("Exception happened " + count + " times. to" + serverName); + this.failureCount = count; + this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec; + this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec; + } - public long getLastAttemptAt() { - return timeOfLatestAttemptMilliSec; - } + /** + * @param count num of consecutive failures + * @param timeOfFirstFailureMilliSec when first failure happened + * @param timeOfLatestAttemptMilliSec when last attempt happened + * @param serverName server we failed to connect to + * @param guaranteedClientSideOnly if true, guarantees that no mutations + * have been applied on the server + */ + public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec, + long timeOfLatestAttemptMilliSec, ServerName serverName, + boolean guaranteedClientSideOnly) { + super("Exception happened " + count + " times. to" + serverName); + this.failureCount = count; + this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec; + this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec; + this.guaranteedClientSideOnly = guaranteedClientSideOnly; + } - public long getFailureCount() { - return failureCount; - } + /** + * @return time of the fist failure + */ + public long getFirstFailureAt() { + return timeOfFirstFailureMilliSec; + } - public boolean wasOperationAttemptedByServer() { - return false; - } - } \ No newline at end of file + /** + * @return time of the latest attempt + */ + public long getLastAttemptAt() { + return timeOfLatestAttemptMilliSec; + } + + /** + * @return failure count + */ + public long getFailureCount() { + return failureCount; + } + + /** + * @return true if operation was attempted by server, false otherwise. + */ + public boolean wasOperationAttemptedByServer() { + return false; + } + + /** + * @return true if we know no mutation made it to the server, false otherwise. + */ + public boolean isGuaranteedClientSideOnly() { + return guaranteedClientSideOnly; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java index 5ceef018c4d..07f1948abc7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java @@ -35,12 +35,15 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -85,6 +88,7 @@ public class TestFastFail { @Before public void setUp() throws Exception { MyPreemptiveFastFailInterceptor.numBraveSouls.set(0); + CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0); } /** @@ -285,6 +289,49 @@ public class TestFastFail { .get()); } + @Test + public void testCallQueueTooBigException() throws Exception { + Admin admin = TEST_UTIL.getHBaseAdmin(); + + final String tableName = "testCallQueueTooBigException"; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes + .toBytes(tableName))); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3); + + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + + conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); + conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); + conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, + CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class); + + final Connection connection = ConnectionFactory.createConnection(conf); + + //Set max call queues size to 0 + SimpleRpcScheduler srs = (SimpleRpcScheduler) + TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().getScheduler(); + Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + newConf.setInt("hbase.ipc.server.max.callqueue.length", 0); + srs.onConfigurationChange(newConf); + + try (Table table = connection.getTable(TableName.valueOf(tableName))) { + Get get = new Get(new byte[1]); + table.get(get); + } catch (Throwable ex) { + } + + assertEquals("There should have been 1 hit", 1, + CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get()); + + newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + newConf.setInt("hbase.ipc.server.max.callqueue.length", 250); + srs.onConfigurationChange(newConf); + } + public static class MyPreemptiveFastFailInterceptor extends PreemptiveFastFailInterceptor { public static AtomicInteger numBraveSouls = new AtomicInteger(); @@ -305,4 +352,19 @@ public class TestFastFail { private byte[] longToByteArrayKey(long rowKey) { return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); } + + public static class CallQueueTooBigPffeInterceptor extends + PreemptiveFastFailInterceptor { + public static AtomicInteger numCallQueueTooBig = new AtomicInteger(); + + @Override + protected void handleFailureToServer(ServerName serverName, Throwable t) { + super.handleFailureToServer(serverName, t); + numCallQueueTooBig.incrementAndGet(); + } + + public CallQueueTooBigPffeInterceptor(Configuration conf) { + super(conf); + } + } }