HBASE-15137 CallTimeoutException and CallQueueTooBigException should trigger PFFE
This commit is contained in:
parent
0a025a1aca
commit
3339bac563
|
@ -27,8 +27,10 @@ class FastFailInterceptorContext extends
|
|||
|
||||
// The variable that indicates whether we were able to connect with the server
|
||||
// in the last run
|
||||
private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(
|
||||
false);
|
||||
private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false);
|
||||
|
||||
// If set, we guarantee that no modifications went to server
|
||||
private MutableBoolean guaranteedClientSideOnly = new MutableBoolean(false);
|
||||
|
||||
// The variable which indicates whether this was a retry or the first time
|
||||
private boolean didTry = false;
|
||||
|
@ -53,6 +55,10 @@ class FastFailInterceptorContext extends
|
|||
return couldNotCommunicateWithServer;
|
||||
}
|
||||
|
||||
public MutableBoolean getGuaranteedClientSideOnly() {
|
||||
return guaranteedClientSideOnly;
|
||||
}
|
||||
|
||||
public FailureInfo getFailureInfo() {
|
||||
return fInfo;
|
||||
}
|
||||
|
@ -78,6 +84,10 @@ class FastFailInterceptorContext extends
|
|||
this.couldNotCommunicateWithServer = couldNotCommunicateWithServer;
|
||||
}
|
||||
|
||||
public void setGuaranteedClientSideOnly(MutableBoolean guaranteedClientSideOnly) {
|
||||
this.guaranteedClientSideOnly = guaranteedClientSideOnly;
|
||||
}
|
||||
|
||||
public void setDidTry(boolean didTry) {
|
||||
this.didTry = didTry;
|
||||
}
|
||||
|
@ -103,6 +113,7 @@ class FastFailInterceptorContext extends
|
|||
fInfo = null;
|
||||
didTry = false;
|
||||
couldNotCommunicateWithServer.setValue(false);
|
||||
guaranteedClientSideOnly.setValue(false);
|
||||
retryDespiteFastFailMode = false;
|
||||
tries = 0;
|
||||
}
|
||||
|
|
|
@ -17,32 +17,26 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.SyncFailedException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
|
||||
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
*
|
||||
* The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
|
||||
|
@ -124,7 +118,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
|
|||
throw new PreemptiveFastFailException(
|
||||
context.getFailureInfo().numConsecutiveFailures.get(),
|
||||
context.getFailureInfo().timeOfFirstFailureMilliSec,
|
||||
context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
|
||||
context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(),
|
||||
context.getGuaranteedClientSideOnly().isTrue());
|
||||
}
|
||||
}
|
||||
context.setDidTry(true);
|
||||
|
@ -133,7 +128,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
|
|||
public void handleFailure(FastFailInterceptorContext context,
|
||||
Throwable t) throws IOException {
|
||||
handleThrowable(t, context.getServer(),
|
||||
context.getCouldNotCommunicateWithServer());
|
||||
context.getCouldNotCommunicateWithServer(),
|
||||
context.getGuaranteedClientSideOnly());
|
||||
}
|
||||
|
||||
public void updateFailureInfo(FastFailInterceptorContext context) {
|
||||
|
@ -153,7 +149,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
|
|||
* - the throwable to be handled.
|
||||
* @throws PreemptiveFastFailException
|
||||
*/
|
||||
private void handleFailureToServer(ServerName serverName, Throwable t) {
|
||||
@VisibleForTesting
|
||||
protected void handleFailureToServer(ServerName serverName, Throwable t) {
|
||||
if (serverName == null || t == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -172,66 +169,19 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
|
|||
}
|
||||
|
||||
public void handleThrowable(Throwable t1, ServerName serverName,
|
||||
MutableBoolean couldNotCommunicateWithServer) throws IOException {
|
||||
Throwable t2 = translateException(t1);
|
||||
MutableBoolean couldNotCommunicateWithServer,
|
||||
MutableBoolean guaranteedClientSideOnly) throws IOException {
|
||||
Throwable t2 = ClientExceptionsUtil.translatePFFE(t1);
|
||||
boolean isLocalException = !(t2 instanceof RemoteException);
|
||||
if (isLocalException && isConnectionException(t2)) {
|
||||
|
||||
if ((isLocalException && ClientExceptionsUtil.isConnectionException(t2)) ||
|
||||
ClientExceptionsUtil.isCallQueueTooBigException(t2)) {
|
||||
couldNotCommunicateWithServer.setValue(true);
|
||||
guaranteedClientSideOnly.setValue(!(t2 instanceof CallTimeoutException));
|
||||
handleFailureToServer(serverName, t2);
|
||||
}
|
||||
}
|
||||
|
||||
private Throwable translateException(Throwable t) throws IOException {
|
||||
if (t instanceof NoSuchMethodError) {
|
||||
// We probably can't recover from this exception by retrying.
|
||||
LOG.error(t);
|
||||
throw (NoSuchMethodError) t;
|
||||
}
|
||||
|
||||
if (t instanceof NullPointerException) {
|
||||
// The same here. This is probably a bug.
|
||||
LOG.error(t.getMessage(), t);
|
||||
throw (NullPointerException) t;
|
||||
}
|
||||
|
||||
if (t instanceof UndeclaredThrowableException) {
|
||||
t = t.getCause();
|
||||
}
|
||||
if (t instanceof RemoteException) {
|
||||
t = ((RemoteException) t).unwrapRemoteException();
|
||||
}
|
||||
if (t instanceof DoNotRetryIOException) {
|
||||
throw (DoNotRetryIOException) t;
|
||||
}
|
||||
if (t instanceof NeedUnmanagedConnectionException) {
|
||||
throw new DoNotRetryIOException(t);
|
||||
}
|
||||
if (t instanceof Error) {
|
||||
throw (Error) t;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the exception is something that indicates that we cannot
|
||||
* contact/communicate with the server.
|
||||
*
|
||||
* @param e
|
||||
* @return true when exception indicates that the client wasn't able to make contact with server
|
||||
*/
|
||||
private boolean isConnectionException(Throwable e) {
|
||||
if (e == null)
|
||||
return false;
|
||||
// This list covers most connectivity exceptions but not all.
|
||||
// For example, in SocketOutputStream a plain IOException is thrown
|
||||
// at times when the channel is closed.
|
||||
return (e instanceof SocketTimeoutException
|
||||
|| e instanceof ConnectException || e instanceof ClosedChannelException
|
||||
|| e instanceof SyncFailedException || e instanceof EOFException
|
||||
|| e instanceof TimeoutException
|
||||
|| e instanceof ConnectionClosingException || e instanceof FailedServerException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Occasionally cleans up unused information in repeatedFailuresMap.
|
||||
*
|
||||
|
|
|
@ -19,13 +19,27 @@
|
|||
|
||||
package org.apache.hadoop.hbase.exceptions;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.SyncFailedException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.RetryImmediatelyException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
||||
import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
|
||||
import org.apache.hadoop.hbase.quotas.ThrottlingException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
|
@ -49,7 +63,7 @@ public final class ClientExceptionsUtil {
|
|||
return (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
|
||||
|| cur instanceof MultiActionResultTooLarge || cur instanceof RetryImmediatelyException
|
||||
|| cur instanceof CallQueueTooBigException || cur instanceof NotServingRegionException);
|
||||
|| isCallQueueTooBigException(cur) || cur instanceof NotServingRegionException);
|
||||
}
|
||||
|
||||
|
||||
|
@ -94,4 +108,79 @@ public final class ClientExceptionsUtil {
|
|||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the exception is CallQueueTooBig exception, or tries to unwrap
|
||||
* {@link RemoteWithExtrasException} to see if we've got {@link CallQueueTooBigException}.
|
||||
* @param t exception to check
|
||||
* @return true if it's a CQTBE, false otherwise
|
||||
*/
|
||||
public static boolean isCallQueueTooBigException(Throwable t) {
|
||||
if (t instanceof CallQueueTooBigException) {
|
||||
return true;
|
||||
}
|
||||
Throwable t2 = t;
|
||||
if (t instanceof RetriesExhaustedException) {
|
||||
t2 = t.getCause();
|
||||
}
|
||||
if (t2 instanceof RemoteWithExtrasException) {
|
||||
return CallQueueTooBigException.class.getName().equals(
|
||||
((RemoteWithExtrasException) t2).getClassName().trim());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the exception is something that indicates that we cannot
|
||||
* contact/communicate with the server.
|
||||
*
|
||||
* @param e exception to check
|
||||
* @return true when exception indicates that the client wasn't able to make contact with server
|
||||
*/
|
||||
public static boolean isConnectionException(Throwable e) {
|
||||
if (e == null) {
|
||||
return false;
|
||||
}
|
||||
// This list covers most connectivity exceptions but not all.
|
||||
// For example, in SocketOutputStream a plain IOException is thrown
|
||||
// at times when the channel is closed.
|
||||
return (e instanceof SocketTimeoutException || e instanceof ConnectException
|
||||
|| e instanceof ClosedChannelException || e instanceof SyncFailedException
|
||||
|| e instanceof EOFException || e instanceof TimeoutException
|
||||
|| e instanceof CallTimeoutException || e instanceof ConnectionClosingException
|
||||
|| e instanceof FailedServerException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Translates exception for preemptive fast fail checks.
|
||||
* @param t exception to check
|
||||
* @return translated exception
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Throwable translatePFFE(Throwable t) throws IOException {
|
||||
if (t instanceof NoSuchMethodError) {
|
||||
// We probably can't recover from this exception by retrying.
|
||||
throw (NoSuchMethodError) t;
|
||||
}
|
||||
|
||||
if (t instanceof NullPointerException) {
|
||||
// The same here. This is probably a bug.
|
||||
throw (NullPointerException) t;
|
||||
}
|
||||
|
||||
if (t instanceof UndeclaredThrowableException) {
|
||||
t = t.getCause();
|
||||
}
|
||||
if (t instanceof RemoteException) {
|
||||
t = ((RemoteException) t).unwrapRemoteException();
|
||||
}
|
||||
if (t instanceof DoNotRetryIOException) {
|
||||
throw (DoNotRetryIOException) t;
|
||||
}
|
||||
if (t instanceof Error) {
|
||||
throw (Error) t;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.exceptions;
|
|||
|
||||
import java.net.ConnectException;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
||||
/**
|
||||
* Thrown when the client believes that we are trying to communicate to has
|
||||
|
@ -32,39 +32,79 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
* On receiving such an exception. The HConnectionManager will skip all
|
||||
* retries and fast fail the operation.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class PreemptiveFastFailException extends ConnectException {
|
||||
private static final long serialVersionUID = 7129103682617007177L;
|
||||
private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec;
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class PreemptiveFastFailException extends ConnectException {
|
||||
private static final long serialVersionUID = 7129103682617007177L;
|
||||
private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec;
|
||||
|
||||
/**
|
||||
* @param count
|
||||
* @param timeOfFirstFailureMilliSec
|
||||
* @param timeOfLatestAttemptMilliSec
|
||||
* @param serverName
|
||||
*/
|
||||
public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
|
||||
long timeOfLatestAttemptMilliSec, ServerName serverName) {
|
||||
super("Exception happened " + count + " times. to" + serverName);
|
||||
this.failureCount = count;
|
||||
this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
|
||||
this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
|
||||
}
|
||||
// If set, we guarantee that no modifications went to server
|
||||
private boolean guaranteedClientSideOnly;
|
||||
|
||||
public long getFirstFailureAt() {
|
||||
return timeOfFirstFailureMilliSec;
|
||||
}
|
||||
/**
|
||||
* @param count num of consecutive failures
|
||||
* @param timeOfFirstFailureMilliSec when first failure happened
|
||||
* @param timeOfLatestAttemptMilliSec when last attempt happened
|
||||
* @param serverName server we failed to connect to
|
||||
*/
|
||||
public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
|
||||
long timeOfLatestAttemptMilliSec, ServerName serverName) {
|
||||
super("Exception happened " + count + " times. to" + serverName);
|
||||
this.failureCount = count;
|
||||
this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
|
||||
this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
|
||||
}
|
||||
|
||||
public long getLastAttemptAt() {
|
||||
return timeOfLatestAttemptMilliSec;
|
||||
}
|
||||
/**
|
||||
* @param count num of consecutive failures
|
||||
* @param timeOfFirstFailureMilliSec when first failure happened
|
||||
* @param timeOfLatestAttemptMilliSec when last attempt happened
|
||||
* @param serverName server we failed to connect to
|
||||
* @param guaranteedClientSideOnly if true, guarantees that no mutations
|
||||
* have been applied on the server
|
||||
*/
|
||||
public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
|
||||
long timeOfLatestAttemptMilliSec, ServerName serverName,
|
||||
boolean guaranteedClientSideOnly) {
|
||||
super("Exception happened " + count + " times. to" + serverName);
|
||||
this.failureCount = count;
|
||||
this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
|
||||
this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
|
||||
this.guaranteedClientSideOnly = guaranteedClientSideOnly;
|
||||
}
|
||||
|
||||
public long getFailureCount() {
|
||||
return failureCount;
|
||||
}
|
||||
/**
|
||||
* @return time of the fist failure
|
||||
*/
|
||||
public long getFirstFailureAt() {
|
||||
return timeOfFirstFailureMilliSec;
|
||||
}
|
||||
|
||||
public boolean wasOperationAttemptedByServer() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @return time of the latest attempt
|
||||
*/
|
||||
public long getLastAttemptAt() {
|
||||
return timeOfLatestAttemptMilliSec;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return failure count
|
||||
*/
|
||||
public long getFailureCount() {
|
||||
return failureCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if operation was attempted by server, false otherwise.
|
||||
*/
|
||||
public boolean wasOperationAttemptedByServer() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we know no mutation made it to the server, false otherwise.
|
||||
*/
|
||||
public boolean isGuaranteedClientSideOnly() {
|
||||
return guaranteedClientSideOnly;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,14 +36,17 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
|
||||
|
@ -61,7 +64,7 @@ public class TestFastFail {
|
|||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||
private static final Random random = new Random();
|
||||
private static int SLAVES = 3;
|
||||
private static int SLAVES = 1;
|
||||
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||
private static final int SLEEPTIME = 5000;
|
||||
|
||||
|
@ -87,6 +90,7 @@ public class TestFastFail {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
MyPreemptiveFastFailInterceptor.numBraveSouls.set(0);
|
||||
CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -298,6 +302,49 @@ public class TestFastFail {
|
|||
.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallQueueTooBigException() throws Exception {
|
||||
Admin admin = TEST_UTIL.getHBaseAdmin();
|
||||
|
||||
final String tableName = "testCallQueueTooBigException";
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
|
||||
.toBytes(tableName)));
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3);
|
||||
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500);
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
|
||||
conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
|
||||
conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
|
||||
conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
|
||||
CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class);
|
||||
|
||||
final Connection connection = ConnectionFactory.createConnection(conf);
|
||||
|
||||
//Set max call queues size to 0
|
||||
SimpleRpcScheduler srs = (SimpleRpcScheduler)
|
||||
TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().getScheduler();
|
||||
Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
newConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
|
||||
srs.onConfigurationChange(newConf);
|
||||
|
||||
try (Table table = connection.getTable(TableName.valueOf(tableName))) {
|
||||
Get get = new Get(new byte[1]);
|
||||
table.get(get);
|
||||
} catch (Throwable ex) {
|
||||
}
|
||||
|
||||
assertEquals("There should have been 1 hit", 1,
|
||||
CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get());
|
||||
|
||||
newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
newConf.setInt("hbase.ipc.server.max.callqueue.length", 250);
|
||||
srs.onConfigurationChange(newConf);
|
||||
}
|
||||
|
||||
public static class MyPreemptiveFastFailInterceptor extends
|
||||
PreemptiveFastFailInterceptor {
|
||||
public static AtomicInteger numBraveSouls = new AtomicInteger();
|
||||
|
@ -318,4 +365,19 @@ public class TestFastFail {
|
|||
private byte[] longToByteArrayKey(long rowKey) {
|
||||
return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
|
||||
}
|
||||
|
||||
public static class CallQueueTooBigPffeInterceptor extends
|
||||
PreemptiveFastFailInterceptor {
|
||||
public static AtomicInteger numCallQueueTooBig = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
protected void handleFailureToServer(ServerName serverName, Throwable t) {
|
||||
super.handleFailureToServer(serverName, t);
|
||||
numCallQueueTooBig.incrementAndGet();
|
||||
}
|
||||
|
||||
public CallQueueTooBigPffeInterceptor(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue