HBASE-15018 Inconsistent way of handling TimeoutException in the rpc client implementations

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Ashish Singhi 2015-12-26 22:21:33 +05:30 committed by Sean Busbey
parent 5e2c2e1780
commit 413d663f92
4 changed files with 92 additions and 72 deletions

View File

@ -24,6 +24,13 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -34,6 +41,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -41,10 +49,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/** /**
* Provides the basics for a RpcClient implementation like configuration and Logging. * Provides the basics for a RpcClient implementation like configuration and Logging.
*/ */
@ -257,6 +261,33 @@ public abstract class AbstractRpcClient implements RpcClient {
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
} }
/**
* Takes an Exception and the address we were trying to connect to and return an IOException with
* the input exception as the cause. The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information. If the exception is
* ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
* an IOException.
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
protected IOException wrapException(InetSocketAddress addr, Exception exception) {
if (exception instanceof ConnectException) {
// connection refused; include the host:port in the error
return (ConnectException) new ConnectException("Call to " + addr
+ " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
+ " failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException) {
return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
+ " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException) new IOException("Call to " + addr + " failed on local exception: "
+ exception).initCause(exception);
}
}
/** /**
* Blocking rpc channel that goes via hbase rpc. * Blocking rpc channel that goes via hbase rpc.
*/ */

View File

@ -251,10 +251,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
if (e.getCause() instanceof IOException) { if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause(); throw (IOException) e.getCause();
} else { } else {
throw new IOException(e.getCause()); throw wrapException(addr, (Exception) e.getCause());
} }
} catch (TimeoutException e) { } catch (TimeoutException e) {
throw new CallTimeoutException(promise.toString()); CallTimeoutException cte = new CallTimeoutException(promise.toString());
throw wrapException(addr, cte);
} }
} }

View File

@ -19,11 +19,37 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting; import java.io.BufferedInputStream;
import com.google.protobuf.Descriptors.MethodDescriptor; import java.io.BufferedOutputStream;
import com.google.protobuf.Message; import java.io.Closeable;
import com.google.protobuf.Message.Builder; import java.io.DataInputStream;
import com.google.protobuf.RpcCallback; import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import javax.security.sasl.SaslException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -70,37 +96,11 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import javax.net.SocketFactory; import com.google.common.annotations.VisibleForTesting;
import javax.security.sasl.SaslException; import com.google.protobuf.Descriptors.MethodDescriptor;
import java.io.BufferedInputStream; import com.google.protobuf.Message;
import java.io.BufferedOutputStream; import com.google.protobuf.Message.Builder;
import java.io.Closeable; import com.google.protobuf.RpcCallback;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Does RPC against a cluster. Manages connections per regionserver in the cluster. * Does RPC against a cluster. Manages connections per regionserver in the cluster.
@ -1266,36 +1266,6 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
* The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information.
* If the exception is ConnectException or SocketTimeoutException,
* return a new one of the same type; Otherwise return an IOException.
*
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
protected IOException wrapException(InetSocketAddress addr,
IOException exception) {
if (exception instanceof ConnectException) {
//connection refused; include the host:port in the error
return (ConnectException)new ConnectException(
"Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
" failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException){
return (ConnectionClosingException) new ConnectionClosingException(
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
exception).initCause(exception);
}
}
/** /**
* Interrupt the connections to the given ip:port server. This should be called if the server * Interrupt the connections to the given ip:port server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and, * is known as actually dead. This will not prevent current operation to be retried, and,

View File

@ -26,8 +26,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -363,4 +366,19 @@ public abstract class AbstractTestIPC {
rpcServer.stop(); rpcServer.stop();
} }
} }
@Test
public void testWrapException() throws Exception {
AbstractRpcClient client =
(AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
assertTrue(client.wrapException(address,
new SocketTimeoutException()) instanceof SocketTimeoutException);
assertTrue(client.wrapException(address, new ConnectionClosingException(
"Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
assertTrue(client
.wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
.getCause() instanceof CallTimeoutException);
}
} }