HBASE-15018 Inconsistent way of handling TimeoutException in the rpc client implementations

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Ashish Singhi 2015-12-26 22:21:33 +05:30 committed by Sean Busbey
parent 9cc01b4029
commit 64553236a9
4 changed files with 92 additions and 72 deletions

View File

@ -24,6 +24,13 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -34,6 +41,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -41,10 +49,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.io.compress.CompressionCodec;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* Provides the basics for a RpcClient implementation like configuration and Logging.
*/
@ -257,6 +261,33 @@ public abstract class AbstractRpcClient implements RpcClient {
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
}
/**
* Takes an Exception and the address we were trying to connect to and return an IOException with
* the input exception as the cause. The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information. If the exception is
* ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
* an IOException.
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
protected IOException wrapException(InetSocketAddress addr, Exception exception) {
if (exception instanceof ConnectException) {
// connection refused; include the host:port in the error
return (ConnectException) new ConnectException("Call to " + addr
+ " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
+ " failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException) {
return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
+ " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException) new IOException("Call to " + addr + " failed on local exception: "
+ exception).initCause(exception);
}
}
/**
* Blocking rpc channel that goes via hbase rpc.
*/

View File

@ -251,10 +251,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(e.getCause());
throw wrapException(addr, (Exception) e.getCause());
}
} catch (TimeoutException e) {
throw new CallTimeoutException(promise.toString());
CallTimeoutException cte = new CallTimeoutException(promise.toString());
throw wrapException(addr, cte);
}
}

View File

@ -19,11 +19,37 @@
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import javax.security.sasl.SaslException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -70,37 +96,11 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import javax.net.SocketFactory;
import javax.security.sasl.SaslException;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
/**
* Does RPC against a cluster. Manages connections per regionserver in the cluster.
@ -1266,36 +1266,6 @@ public class RpcClientImpl extends AbstractRpcClient {
}
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
* The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information.
* If the exception is ConnectException or SocketTimeoutException,
* return a new one of the same type; Otherwise return an IOException.
*
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
protected IOException wrapException(InetSocketAddress addr,
IOException exception) {
if (exception instanceof ConnectException) {
//connection refused; include the host:port in the error
return (ConnectException)new ConnectException(
"Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
" failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException){
return (ConnectionClosingException) new ConnectionClosingException(
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
exception).initCause(exception);
}
}
/**
* Interrupt the connections to the given ip:port server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,

View File

@ -26,8 +26,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -363,4 +366,19 @@ public abstract class AbstractTestIPC {
rpcServer.stop();
}
}
@Test
public void testWrapException() throws Exception {
AbstractRpcClient client =
(AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
assertTrue(client.wrapException(address,
new SocketTimeoutException()) instanceof SocketTimeoutException);
assertTrue(client.wrapException(address, new ConnectionClosingException(
"Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
assertTrue(client
.wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
.getCause() instanceof CallTimeoutException);
}
}