HBASE-15018 Inconsistent way of handling TimeoutException in the rpc client implemenations (Ashish Singhi)
This commit is contained in:
parent
c0ad4cdd7a
commit
59cca6297f
|
@ -24,6 +24,13 @@ import com.google.protobuf.Descriptors;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -34,6 +41,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
||||||
|
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -41,10 +49,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.PoolMap;
|
import org.apache.hadoop.hbase.util.PoolMap;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides the basics for a RpcClient implementation like configuration and Logging.
|
* Provides the basics for a RpcClient implementation like configuration and Logging.
|
||||||
*/
|
*/
|
||||||
|
@ -257,6 +261,33 @@ public abstract class AbstractRpcClient implements RpcClient {
|
||||||
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
|
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes an Exception and the address we were trying to connect to and return an IOException with
|
||||||
|
* the input exception as the cause. The new exception provides the stack trace of the place where
|
||||||
|
* the exception is thrown and some extra diagnostics information. If the exception is
|
||||||
|
* ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
|
||||||
|
* an IOException.
|
||||||
|
* @param addr target address
|
||||||
|
* @param exception the relevant exception
|
||||||
|
* @return an exception to throw
|
||||||
|
*/
|
||||||
|
protected IOException wrapException(InetSocketAddress addr, Exception exception) {
|
||||||
|
if (exception instanceof ConnectException) {
|
||||||
|
// connection refused; include the host:port in the error
|
||||||
|
return (ConnectException) new ConnectException("Call to " + addr
|
||||||
|
+ " failed on connection exception: " + exception).initCause(exception);
|
||||||
|
} else if (exception instanceof SocketTimeoutException) {
|
||||||
|
return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
|
||||||
|
+ " failed because " + exception).initCause(exception);
|
||||||
|
} else if (exception instanceof ConnectionClosingException) {
|
||||||
|
return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
|
||||||
|
+ " failed on local exception: " + exception).initCause(exception);
|
||||||
|
} else {
|
||||||
|
return (IOException) new IOException("Call to " + addr + " failed on local exception: "
|
||||||
|
+ exception).initCause(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Blocking rpc channel that goes via hbase rpc.
|
* Blocking rpc channel that goes via hbase rpc.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -248,13 +248,10 @@ public class AsyncRpcClient extends AbstractRpcClient {
|
||||||
Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
|
Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
|
||||||
return new Pair<>(response, pcrc.cellScanner());
|
return new Pair<>(response, pcrc.cellScanner());
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
if (e.getCause() instanceof IOException) {
|
throw wrapException(addr, e);
|
||||||
throw (IOException) e.getCause();
|
|
||||||
} else {
|
|
||||||
throw new IOException(e.getCause());
|
|
||||||
}
|
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
throw new CallTimeoutException(promise.toString());
|
CallTimeoutException cte = new CallTimeoutException(promise.toString());
|
||||||
|
throw wrapException(addr, cte);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,6 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.ConnectException;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
@ -1266,36 +1265,6 @@ public class RpcClientImpl extends AbstractRpcClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Take an IOException and the address we were trying to connect to
|
|
||||||
* and return an IOException with the input exception as the cause.
|
|
||||||
* The new exception provides the stack trace of the place where
|
|
||||||
* the exception is thrown and some extra diagnostics information.
|
|
||||||
* If the exception is ConnectException or SocketTimeoutException,
|
|
||||||
* return a new one of the same type; Otherwise return an IOException.
|
|
||||||
*
|
|
||||||
* @param addr target address
|
|
||||||
* @param exception the relevant exception
|
|
||||||
* @return an exception to throw
|
|
||||||
*/
|
|
||||||
protected IOException wrapException(InetSocketAddress addr,
|
|
||||||
IOException exception) {
|
|
||||||
if (exception instanceof ConnectException) {
|
|
||||||
//connection refused; include the host:port in the error
|
|
||||||
return (ConnectException)new ConnectException(
|
|
||||||
"Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
|
|
||||||
} else if (exception instanceof SocketTimeoutException) {
|
|
||||||
return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
|
|
||||||
" failed because " + exception).initCause(exception);
|
|
||||||
} else if (exception instanceof ConnectionClosingException){
|
|
||||||
return (ConnectionClosingException) new ConnectionClosingException(
|
|
||||||
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
|
|
||||||
} else {
|
|
||||||
return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
|
|
||||||
exception).initCause(exception);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interrupt the connections to the given ip:port server. This should be called if the server
|
* Interrupt the connections to the given ip:port server. This should be called if the server
|
||||||
* is known as actually dead. This will not prevent current operation to be retried, and,
|
* is known as actually dead. This will not prevent current operation to be retried, and,
|
||||||
|
|
Loading…
Reference in New Issue