Revert "HBASE-15018 Inconsistent way of handling TimeoutException in the rpc client implemenations (Ashish Singhi)"

This reverts commit e00a04df10.
This commit is contained in:
stack 2015-12-23 15:31:10 -08:00
parent 8e0854c64b
commit 04de427e57
3 changed files with 41 additions and 38 deletions

View File

@ -24,13 +24,6 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -41,7 +34,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -49,6 +41,10 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/** /**
* Provides the basics for a RpcClient implementation like configuration and Logging. * Provides the basics for a RpcClient implementation like configuration and Logging.
*/ */
@ -261,33 +257,6 @@ public abstract class AbstractRpcClient implements RpcClient {
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout); return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
} }
/**
* Takes an Exception and the address we were trying to connect to and return an IOException with
* the input exception as the cause. The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information. If the exception is
* ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
* an IOException.
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
protected IOException wrapException(InetSocketAddress addr, Exception exception) {
if (exception instanceof ConnectException) {
// connection refused; include the host:port in the error
return (ConnectException) new ConnectException("Call to " + addr
+ " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
+ " failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException) {
return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
+ " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException) new IOException("Call to " + addr + " failed on local exception: "
+ exception).initCause(exception);
}
}
/** /**
* Blocking rpc channel that goes via hbase rpc. * Blocking rpc channel that goes via hbase rpc.
*/ */

View File

@ -248,10 +248,13 @@ public class AsyncRpcClient extends AbstractRpcClient {
Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
return new Pair<>(response, pcrc.cellScanner()); return new Pair<>(response, pcrc.cellScanner());
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw wrapException(addr, e); if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new IOException(e.getCause());
}
} catch (TimeoutException e) { } catch (TimeoutException e) {
CallTimeoutException cte = new CallTimeoutException(promise.toString()); throw new CallTimeoutException(promise.toString());
throw wrapException(addr, cte);
} }
} }

View File

@ -81,6 +81,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -1265,6 +1266,36 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
* The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information.
* If the exception is ConnectException or SocketTimeoutException,
* return a new one of the same type; Otherwise return an IOException.
*
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
protected IOException wrapException(InetSocketAddress addr,
IOException exception) {
if (exception instanceof ConnectException) {
//connection refused; include the host:port in the error
return (ConnectException)new ConnectException(
"Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
" failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException){
return (ConnectionClosingException) new ConnectionClosingException(
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
exception).initCause(exception);
}
}
/** /**
* Interrupt the connections to the given ip:port server. This should be called if the server * Interrupt the connections to the given ip:port server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and, * is known as actually dead. This will not prevent current operation to be retried, and,