HBASE-15137 CallTimeoutException and CallQueueTooBigException should trigger PFFE

This commit is contained in:
Mikhail Antonov 2016-03-07 20:53:52 -08:00
parent 4f044cf6be
commit 46cc3d4972
5 changed files with 249 additions and 99 deletions

View File

@ -27,8 +27,10 @@ class FastFailInterceptorContext extends
// The variable that indicates whether we were able to connect with the server
// in the last run
private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(
false);
private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false);
// If set, we guarantee that no modifications went to server
private MutableBoolean guaranteedClientSideOnly = new MutableBoolean(false);
// The variable which indicates whether this was a retry or the first time
private boolean didTry = false;
@ -53,6 +55,10 @@ class FastFailInterceptorContext extends
return couldNotCommunicateWithServer;
}
public MutableBoolean getGuaranteedClientSideOnly() {
return guaranteedClientSideOnly;
}
public FailureInfo getFailureInfo() {
return fInfo;
}
@ -78,6 +84,10 @@ class FastFailInterceptorContext extends
this.couldNotCommunicateWithServer = couldNotCommunicateWithServer;
}
public void setGuaranteedClientSideOnly(MutableBoolean guaranteedClientSideOnly) {
this.guaranteedClientSideOnly = guaranteedClientSideOnly;
}
public void setDidTry(boolean didTry) {
this.didTry = didTry;
}
@ -103,6 +113,7 @@ class FastFailInterceptorContext extends
fInfo = null;
didTry = false;
couldNotCommunicateWithServer.setValue(false);
guaranteedClientSideOnly.setValue(false);
retryDespiteFastFailMode = false;
tries = 0;
}

View File

@ -17,32 +17,26 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.EOFException;
import java.io.IOException;
import java.io.SyncFailedException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
/**
*
* The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
@ -124,7 +118,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
throw new PreemptiveFastFailException(
context.getFailureInfo().numConsecutiveFailures.get(),
context.getFailureInfo().timeOfFirstFailureMilliSec,
context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(),
context.getGuaranteedClientSideOnly().isTrue());
}
}
context.setDidTry(true);
@ -133,7 +128,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
public void handleFailure(FastFailInterceptorContext context,
Throwable t) throws IOException {
handleThrowable(t, context.getServer(),
context.getCouldNotCommunicateWithServer());
context.getCouldNotCommunicateWithServer(),
context.getGuaranteedClientSideOnly());
}
public void updateFailureInfo(FastFailInterceptorContext context) {
@ -153,7 +149,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
* - the throwable to be handled.
* @throws PreemptiveFastFailException
*/
private void handleFailureToServer(ServerName serverName, Throwable t) {
@VisibleForTesting
protected void handleFailureToServer(ServerName serverName, Throwable t) {
if (serverName == null || t == null) {
return;
}
@ -172,63 +169,19 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
}
public void handleThrowable(Throwable t1, ServerName serverName,
MutableBoolean couldNotCommunicateWithServer) throws IOException {
Throwable t2 = translateException(t1);
MutableBoolean couldNotCommunicateWithServer,
MutableBoolean guaranteedClientSideOnly) throws IOException {
Throwable t2 = ClientExceptionsUtil.translatePFFE(t1);
boolean isLocalException = !(t2 instanceof RemoteException);
if (isLocalException && isConnectionException(t2)) {
if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2)) ||
ClientExceptionsUtil.isCallQueueTooBigException(t2)) {
couldNotCommunicateWithServer.setValue(true);
guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
handleFailureToServer(serverName, t2);
}
}
private Throwable translateException(Throwable t) throws IOException {
if (t instanceof NoSuchMethodError) {
// We probably can't recover from this exception by retrying.
LOG.error(t);
throw (NoSuchMethodError) t;
}
if (t instanceof NullPointerException) {
// The same here. This is probably a bug.
LOG.error(t.getMessage(), t);
throw (NullPointerException) t;
}
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) t;
}
if (t instanceof Error) {
throw (Error) t;
}
return t;
}
/**
* Check if the exception is something that indicates that we cannot
* contact/communicate with the server.
*
* @param e
* @return true when exception indicates that the client wasn't able to make contact with server
*/
private boolean isConnectionException(Throwable e) {
if (e == null)
return false;
// This list covers most connectivity exceptions but not all.
// For example, in SocketOutputStream a plain IOException is thrown
// at times when the channel is closed.
return (e instanceof SocketTimeoutException
|| e instanceof ConnectException || e instanceof ClosedChannelException
|| e instanceof SyncFailedException || e instanceof EOFException
|| e instanceof TimeoutException
|| e instanceof ConnectionClosingException || e instanceof FailedServerException);
}
/**
* Occasionally cleans up unused information in repeatedFailuresMap.
*

View File

@ -19,13 +19,26 @@
package org.apache.hadoop.hbase.exceptions;
import java.io.EOFException;
import java.io.IOException;
import java.io.SyncFailedException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
import org.apache.hadoop.hbase.quotas.ThrottlingException;
import org.apache.hadoop.ipc.RemoteException;
@ -49,7 +62,7 @@ public final class ClientExceptionsUtil {
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|| cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException);
|| isCallQueueTooBigException(cur) || cur instanceof NotServingRegionException);
}
@ -94,4 +107,75 @@ public final class ClientExceptionsUtil {
return null;
}
/**
* Checks if the exception is CallQueueTooBig exception, or tries to unwrap
* {@link RemoteWithExtrasException} to see if we've got {@link CallQueueTooBigException}.
* @param t exception to check
* @return true if it's a CQTBE, false otherwise
*/
public static boolean isCallQueueTooBigException(Throwable t) {
if (t instanceof CallQueueTooBigException) {
return true;
}
if (t instanceof RemoteWithExtrasException) {
return CallQueueTooBigException.class.getName().equals(
((RemoteWithExtrasException) t).getClassName().trim());
} else {
return false;
}
}
/**
* Check if the exception is something that indicates that we cannot
* contact/communicate with the server.
*
* @param e exception to check
* @return true when exception indicates that the client wasn't able to make contact with server
*/
public static boolean isConnectionException(Throwable e) {
if (e == null) {
return false;
}
// This list covers most connectivity exceptions but not all.
// For example, in SocketOutputStream a plain IOException is thrown
// at times when the channel is closed.
return (e instanceof SocketTimeoutException || e instanceof ConnectException
|| e instanceof ClosedChannelException || e instanceof SyncFailedException
|| e instanceof EOFException || e instanceof TimeoutException
|| e instanceof CallTimeoutException || e instanceof ConnectionClosingException
|| e instanceof FailedServerException);
}
/**
* Translates exception for preemptive fast fail checks.
* @param t exception to check
* @return translated exception
* @throws IOException
*/
public static Throwable translatePFFE(Throwable t) throws IOException {
if (t instanceof NoSuchMethodError) {
// We probably can't recover from this exception by retrying.
throw (NoSuchMethodError) t;
}
if (t instanceof NullPointerException) {
// The same here. This is probably a bug.
throw (NullPointerException) t;
}
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) t;
}
if (t instanceof Error) {
throw (Error) t;
}
return t;
}
}

View File

@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.exceptions;
import java.net.ConnectException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ServerName;
/**
* Thrown when the client believes that we are trying to communicate to has
@ -32,17 +32,20 @@ import org.apache.hadoop.hbase.ServerName;
* On receiving such an exception. The ConnectionManager will skip all
* retries and fast fail the operation.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class PreemptiveFastFailException extends ConnectException {
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class PreemptiveFastFailException extends ConnectException {
private static final long serialVersionUID = 7129103682617007177L;
private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec;
// If set, we guarantee that no modifications went to server
private boolean guaranteedClientSideOnly;
/**
* @param count
* @param timeOfFirstFailureMilliSec
* @param timeOfLatestAttemptMilliSec
* @param serverName
* @param count num of consecutive failures
* @param timeOfFirstFailureMilliSec when first failure happened
* @param timeOfLatestAttemptMilliSec when last attempt happened
* @param serverName server we failed to connect to
*/
public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
long timeOfLatestAttemptMilliSec, ServerName serverName) {
@ -52,19 +55,56 @@ import org.apache.hadoop.hbase.ServerName;
this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
}
/**
* @param count num of consecutive failures
* @param timeOfFirstFailureMilliSec when first failure happened
* @param timeOfLatestAttemptMilliSec when last attempt happened
* @param serverName server we failed to connect to
* @param guaranteedClientSideOnly if true, guarantees that no mutations
* have been applied on the server
*/
public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
long timeOfLatestAttemptMilliSec, ServerName serverName,
boolean guaranteedClientSideOnly) {
super("Exception happened " + count + " times. to" + serverName);
this.failureCount = count;
this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
this.guaranteedClientSideOnly = guaranteedClientSideOnly;
}
/**
* @return time of the fist failure
*/
public long getFirstFailureAt() {
return timeOfFirstFailureMilliSec;
}
/**
* @return time of the latest attempt
*/
public long getLastAttemptAt() {
return timeOfLatestAttemptMilliSec;
}
/**
* @return failure count
*/
public long getFailureCount() {
return failureCount;
}
/**
* @return true if operation was attempted by server, false otherwise.
*/
public boolean wasOperationAttemptedByServer() {
return false;
}
/**
* @return true if we know no mutation made it to the server, false otherwise.
*/
public boolean isGuaranteedClientSideOnly() {
return guaranteedClientSideOnly;
}
}

View File

@ -35,12 +35,15 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -85,6 +88,7 @@ public class TestFastFail {
@Before
public void setUp() throws Exception {
MyPreemptiveFastFailInterceptor.numBraveSouls.set(0);
CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0);
}
/**
@ -285,6 +289,49 @@ public class TestFastFail {
.get());
}
@Test
public void testCallQueueTooBigException() throws Exception {
Admin admin = TEST_UTIL.getHBaseAdmin();
final String tableName = "testCallQueueTooBigException";
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
.toBytes(tableName)));
desc.addFamily(new HColumnDescriptor(FAMILY));
admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class);
final Connection connection = ConnectionFactory.createConnection(conf);
//Set max call queues size to 0
SimpleRpcScheduler srs = (SimpleRpcScheduler)
TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().getScheduler();
Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
newConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
srs.onConfigurationChange(newConf);
try (Table table = connection.getTable(TableName.valueOf(tableName))) {
Get get = new Get(new byte[1]);
table.get(get);
} catch (Throwable ex) {
}
assertEquals("There should have been 1 hit", 1,
CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get());
newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
newConf.setInt("hbase.ipc.server.max.callqueue.length", 250);
srs.onConfigurationChange(newConf);
}
public static class MyPreemptiveFastFailInterceptor extends
PreemptiveFastFailInterceptor {
public static AtomicInteger numBraveSouls = new AtomicInteger();
@ -305,4 +352,19 @@ public class TestFastFail {
private byte[] longToByteArrayKey(long rowKey) {
return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
}
public static class CallQueueTooBigPffeInterceptor extends
PreemptiveFastFailInterceptor {
public static AtomicInteger numCallQueueTooBig = new AtomicInteger();
@Override
protected void handleFailureToServer(ServerName serverName, Throwable t) {
super.handleFailureToServer(serverName, t);
numCallQueueTooBig.incrementAndGet();
}
public CallQueueTooBigPffeInterceptor(Configuration conf) {
super(conf);
}
}
}