From 64553236a972f6a32dbff4c2275e99780b06d744 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Sat, 26 Dec 2015 22:21:33 +0530 Subject: [PATCH] HBASE-15018 Inconsistent way of handling TimeoutException in the rpc client implementations Signed-off-by: Sean Busbey --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 39 ++++++- .../hadoop/hbase/ipc/AsyncRpcClient.java | 5 +- .../hadoop/hbase/ipc/RpcClientImpl.java | 102 +++++++----------- .../hadoop/hbase/ipc/AbstractTestIPC.java | 18 ++++ 4 files changed, 92 insertions(+), 72 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 6f5e78aeb7d..e33ef3a1479 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -24,6 +24,13 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,6 +41,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -41,10 +49,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.io.compress.CompressionCodec; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; - /** * Provides the basics for a RpcClient implementation like configuration and Logging. */ @@ -257,6 +261,33 @@ public abstract class AbstractRpcClient implements RpcClient { return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); } + /** + * Takes an Exception and the address we were trying to connect to and return an IOException with + * the input exception as the cause. The new exception provides the stack trace of the place where + * the exception is thrown and some extra diagnostics information. If the exception is + * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return + * an IOException. + * @param addr target address + * @param exception the relevant exception + * @return an exception to throw + */ + protected IOException wrapException(InetSocketAddress addr, Exception exception) { + if (exception instanceof ConnectException) { + // connection refused; include the host:port in the error + return (ConnectException) new ConnectException("Call to " + addr + + " failed on connection exception: " + exception).initCause(exception); + } else if (exception instanceof SocketTimeoutException) { + return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr + + " failed because " + exception).initCause(exception); + } else if (exception instanceof ConnectionClosingException) { + return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr + + " failed on local exception: " + exception).initCause(exception); + } else { + return (IOException) new IOException("Call to " + addr + " failed on local exception: " + + exception).initCause(exception); + } + } + /** * Blocking rpc channel that goes via hbase rpc. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 60e9add9f13..e12e298f682 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -251,10 +251,11 @@ public class AsyncRpcClient extends AbstractRpcClient { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { - throw new IOException(e.getCause()); + throw wrapException(addr, (Exception) e.getCause()); } } catch (TimeoutException e) { - throw new CallTimeoutException(promise.toString()); + CallTimeoutException cte = new CallTimeoutException(promise.toString()); + throw wrapException(addr, cte); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 21b257fa80d..1509f5414c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -19,11 +19,37 @@ package org.apache.hadoop.hbase.ipc; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; -import com.google.protobuf.RpcCallback; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.net.SocketFactory; +import javax.security.sasl.SaslException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,37 +96,11 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import javax.net.SocketFactory; -import javax.security.sasl.SaslException; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import com.google.protobuf.RpcCallback; /** * Does RPC against a cluster. Manages connections per regionserver in the cluster. @@ -1266,36 +1266,6 @@ public class RpcClientImpl extends AbstractRpcClient { } - /** - * Take an IOException and the address we were trying to connect to - * and return an IOException with the input exception as the cause. - * The new exception provides the stack trace of the place where - * the exception is thrown and some extra diagnostics information. - * If the exception is ConnectException or SocketTimeoutException, - * return a new one of the same type; Otherwise return an IOException. - * - * @param addr target address - * @param exception the relevant exception - * @return an exception to throw - */ - protected IOException wrapException(InetSocketAddress addr, - IOException exception) { - if (exception instanceof ConnectException) { - //connection refused; include the host:port in the error - return (ConnectException)new ConnectException( - "Call to " + addr + " failed on connection exception: " + exception).initCause(exception); - } else if (exception instanceof SocketTimeoutException) { - return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr + - " failed because " + exception).initCause(exception); - } else if (exception instanceof ConnectionClosingException){ - return (ConnectionClosingException) new ConnectionClosingException( - "Call to " + addr + " failed on local exception: " + exception).initCause(exception); - } else { - return (IOException)new IOException("Call to " + addr + " failed on local exception: " + - exception).initCause(exception); - } - } - /** * Interrupt the connections to the given ip:port server. This should be called if the server * is known as actually dead. This will not prevent current operation to be retried, and, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 5df1edc29c8..ffe4d40600b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -26,8 +26,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; +import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -363,4 +366,19 @@ public abstract class AbstractTestIPC { rpcServer.stop(); } } + + @Test + public void testWrapException() throws Exception { + AbstractRpcClient client = + (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC"); + final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); + assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException); + assertTrue(client.wrapException(address, + new SocketTimeoutException()) instanceof SocketTimeoutException); + assertTrue(client.wrapException(address, new ConnectionClosingException( + "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException); + assertTrue(client + .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException")) + .getCause() instanceof CallTimeoutException); + } }