HBASE-15520 Fix broken TestAsyncIPC

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
This commit is contained in:
zhangduo 2016-03-24 10:24:34 +08:00
parent 604415e827
commit 9fa44a8126
3 changed files with 185 additions and 204 deletions

View File

@ -17,18 +17,6 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -53,6 +41,7 @@ import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.AuthMethod;
@ -76,6 +65,18 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcCallback;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
/** /**
* Netty RPC channel * Netty RPC channel
*/ */
@ -85,11 +86,11 @@ public class AsyncRpcChannel {
private static final int MAX_SASL_RETRIES = 5; private static final int MAX_SASL_RETRIES = 5;
protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
TokenIdentifier>> tokenHandlers = new HashMap<>(); = new HashMap<>();
static { static {
tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
new AuthenticationTokenSelector()); new AuthenticationTokenSelector());
} }
@ -113,7 +114,6 @@ public class AsyncRpcChannel {
private Token<? extends TokenIdentifier> token; private Token<? extends TokenIdentifier> token;
private String serverPrincipal; private String serverPrincipal;
// NOTE: closed and connected flags below are only changed when a lock on pendingCalls // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>(); private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
private boolean connected = false; private boolean connected = false;
@ -130,15 +130,14 @@ public class AsyncRpcChannel {
/** /**
* Constructor for netty RPC channel * Constructor for netty RPC channel
*
* @param bootstrap to construct channel on * @param bootstrap to construct channel on
* @param client to connect with * @param client to connect with
* @param ticket of user which uses connection * @param ticket of user which uses connection
* @param serviceName name of service to connect to * @param serviceName name of service to connect to
* @param address to connect to * @param address to connect to
*/ */
public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
serviceName, InetSocketAddress address) { String serviceName, InetSocketAddress address) {
this.client = client; this.client = client;
this.ticket = ticket; this.ticket = ticket;
@ -147,16 +146,12 @@ public class AsyncRpcChannel {
this.channel = connect(bootstrap).channel(); this.channel = connect(bootstrap).channel();
name = ("IPC Client (" + channel.hashCode() + ") to " + name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
address.toString() + + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
((ticket == null) ?
" from unknown user" :
(" from " + ticket.getName())));
} }
/** /**
* Connect to channel * Connect to channel
*
* @param bootstrap to connect to * @param bootstrap to connect to
* @return future of connection * @return future of connection
*/ */
@ -215,12 +210,11 @@ public class AsyncRpcChannel {
/** /**
* Start HBase connection * Start HBase connection
*
* @param ch channel to start connection on * @param ch channel to start connection on
*/ */
private void startHBaseConnection(Channel ch) { private void startHBaseConnection(Channel ch) {
ch.pipeline() ch.pipeline().addLast("frameDecoder",
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast(new AsyncServerResponseHandler(this)); ch.pipeline().addLast(new AsyncServerResponseHandler(this));
try { try {
writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() { writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
@ -254,7 +248,8 @@ public class AsyncRpcChannel {
private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
final Bootstrap bootstrap) throws IOException { final Bootstrap bootstrap) throws IOException {
return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
client.fallbackAllowed, client.conf.get("hbase.rpc.protection", client.fallbackAllowed,
client.conf.get("hbase.rpc.protection",
SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
new SaslClientHandler.SaslExceptionHandler() { new SaslClientHandler.SaslExceptionHandler() {
@Override @Override
@ -312,9 +307,8 @@ public class AsyncRpcChannel {
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method, public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request, final PayloadCarryingRpcController controller, final Message request,
final Message responsePrototype, MetricsConnection.CallStats callStats) { final Message responsePrototype, MetricsConnection.CallStats callStats) {
final AsyncCall call = final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, method, request, controller, responsePrototype, callStats);
controller, responsePrototype, callStats);
controller.notifyOnCancel(new RpcCallback<Object>() { controller.notifyOnCancel(new RpcCallback<Object>() {
@Override @Override
public void run(Object parameter) { public void run(Object parameter) {
@ -340,9 +334,7 @@ public class AsyncRpcChannel {
pendingCalls.put(call.id, call); pendingCalls.put(call.id, call);
// Add timeout for cleanup if none is present // Add timeout for cleanup if none is present
if (cleanupTimer == null && call.getRpcTimeout() > 0) { if (cleanupTimer == null && call.getRpcTimeout() > 0) {
cleanupTimer = cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
client.newTimeout(timeoutTask, call.getRpcTimeout(),
TimeUnit.MILLISECONDS);
} }
if (!connected) { if (!connected) {
return call; return call;
@ -360,14 +352,13 @@ public class AsyncRpcChannel {
/** /**
* Write the channel header * Write the channel header
*
* @param channel to write to * @param channel to write to
* @return future of write * @return future of write
* @throws java.io.IOException on failure to write * @throws java.io.IOException on failure to write
*/ */
private ChannelFuture writeChannelHeader(Channel channel) throws IOException { private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName); .setServiceName(serviceName);
RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
if (userInfoPB != null) { if (userInfoPB != null) {
@ -384,7 +375,6 @@ public class AsyncRpcChannel {
headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
RPCProtos.ConnectionHeader header = headerBuilder.build(); RPCProtos.ConnectionHeader header = headerBuilder.build();
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
ByteBuf b = channel.alloc().directBuffer(totalSize); ByteBuf b = channel.alloc().directBuffer(totalSize);
@ -397,20 +387,19 @@ public class AsyncRpcChannel {
/** /**
* Write request to channel * Write request to channel
*
* @param call to write * @param call to write
*/ */
private void writeRequest(final AsyncCall call) { private void writeRequest(final AsyncCall call) {
try { try {
final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
.newBuilder(); .newBuilder();
requestHeaderBuilder.setCallId(call.id) requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
.setMethodName(call.method.getName()).setRequestParam(call.param != null); .setRequestParam(call.param != null);
if (Trace.isTracing()) { if (Trace.isTracing()) {
Span s = Trace.currentSpan(); Span s = Trace.currentSpan();
requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder(). requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
setParentId(s.getSpanId()).setTraceId(s.getTraceId())); .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
} }
ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
@ -445,7 +434,6 @@ public class AsyncRpcChannel {
/** /**
* Set up server authorization * Set up server authorization
*
* @throws java.io.IOException if auth setup failed * @throws java.io.IOException if auth setup failed
*/ */
private void setupAuthorization() throws IOException { private void setupAuthorization() throws IOException {
@ -456,10 +444,10 @@ public class AsyncRpcChannel {
if (useSasl && securityInfo != null) { if (useSasl && securityInfo != null) {
AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
if (tokenKind != null) { if (tokenKind != null) {
TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind); TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
if (tokenSelector != null) { if (tokenSelector != null) {
token = tokenSelector token = tokenSelector.selectToken(new Text(client.clusterId),
.selectToken(new Text(client.clusterId), ticket.getUGI().getTokens()); ticket.getUGI().getTokens());
} else if (LOG.isDebugEnabled()) { } else if (LOG.isDebugEnabled()) {
LOG.debug("No token selector found for type " + tokenKind); LOG.debug("No token selector found for type " + tokenKind);
} }
@ -485,15 +473,14 @@ public class AsyncRpcChannel {
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Use " + authMethod + " authentication for service " + serviceName + LOG.debug(
", sasl=" + useSasl); "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
} }
reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
} }
/** /**
* Build the user information * Build the user information
*
* @param ugi User Group Information * @param ugi User Group Information
* @param authMethod Authorization method * @param authMethod Authorization method
* @return UserInformation protobuf * @return UserInformation protobuf
@ -519,7 +506,6 @@ public class AsyncRpcChannel {
/** /**
* Create connection preamble * Create connection preamble
*
* @param byteBuf to write to * @param byteBuf to write to
* @param authMethod to write * @param authMethod to write
*/ */
@ -529,18 +515,7 @@ public class AsyncRpcChannel {
byteBuf.writeByte(authMethod.code); byteBuf.writeByte(authMethod.code);
} }
/** private void close0(Throwable e) {
* Close connection
*
* @param e exception on close
*/
public void close(final Throwable e) {
client.removeConnection(this);
// Move closing from the requesting thread to the channel thread
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
List<AsyncCall> toCleanup; List<AsyncCall> toCleanup;
synchronized (pendingCalls) { synchronized (pendingCalls) {
if (closed) { if (closed) {
@ -567,7 +542,8 @@ public class AsyncRpcChannel {
cleanupTimer = null; cleanupTimer = null;
} }
for (AsyncCall call : toCleanup) { for (AsyncCall call : toCleanup) {
call.setFailed(closeException != null ? closeException : new ConnectionClosingException( call.setFailed(closeException != null ? closeException
: new ConnectionClosingException(
"Call id=" + call.id + " on server " + address + " aborted: connection is closing")); "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
} }
channel.disconnect().addListener(ChannelFutureListener.CLOSE); channel.disconnect().addListener(ChannelFutureListener.CLOSE);
@ -575,8 +551,26 @@ public class AsyncRpcChannel {
LOG.debug(name + ": closed"); LOG.debug(name + ": closed");
} }
} }
/**
* Close connection
* @param e exception on close
*/
public void close(final Throwable e) {
client.removeConnection(this);
// Move closing from the requesting thread to the channel thread
if (channel.eventLoop().inEventLoop()) {
close0(e);
} else {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
close0(e);
}
}); });
} }
}
/** /**
* Clean up calls. * Clean up calls.
@ -601,9 +595,7 @@ public class AsyncRpcChannel {
} }
} }
if (nextCleanupTaskDelay > 0) { if (nextCleanupTaskDelay > 0) {
cleanupTimer = cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
client.newTimeout(timeoutTask, nextCleanupTaskDelay,
TimeUnit.MILLISECONDS);
} else { } else {
cleanupTimer = null; cleanupTimer = null;
} }
@ -616,7 +608,6 @@ public class AsyncRpcChannel {
/** /**
* Check if the connection is alive * Check if the connection is alive
*
* @return true if alive * @return true if alive
*/ */
public boolean isAlive() { public boolean isAlive() {
@ -625,7 +616,6 @@ public class AsyncRpcChannel {
/** /**
* Check if user should authenticate over Kerberos * Check if user should authenticate over Kerberos
*
* @return true if should be authenticated over Kerberos * @return true if should be authenticated over Kerberos
* @throws java.io.IOException on failure of check * @throws java.io.IOException on failure of check
*/ */
@ -633,8 +623,7 @@ public class AsyncRpcChannel {
UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
UserGroupInformation realUser = currentUser.getRealUser(); UserGroupInformation realUser = currentUser.getRealUser();
return authMethod == AuthMethod.KERBEROS && return authMethod == AuthMethod.KERBEROS && loginUser != null &&
loginUser != null &&
// Make sure user logged in using Kerberos either keytab or TGT // Make sure user logged in using Kerberos either keytab or TGT
loginUser.hasKerberosCredentials() && loginUser.hasKerberosCredentials() &&
// relogin only in case it is the login user (e.g. JT) // relogin only in case it is the login user (e.g. JT)
@ -643,23 +632,18 @@ public class AsyncRpcChannel {
} }
/** /**
* If multiple clients with the same principal try to connect * If multiple clients with the same principal try to connect to the same server at the same time,
* to the same server at the same time, the server assumes a * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
* replay attack is in progress. This is a feature of kerberos. * work around this, what is done is that the client backs off randomly and tries to initiate the
* In order to work around this, what is done is that the client * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
* backs off randomly and tries to initiate the connection * attempted.
* again.
* The other problem is to do with ticket expiry. To handle that,
* a relogin is attempted.
* <p> * <p>
* The retry logic is governed by the {@link #shouldAuthenticateOverKrb} * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
* method. In case when the user doesn't have valid credentials, we don't * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
* need to retry (from cache or ticket). In such cases, it is prudent to * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
* throw a runtime exception when we receive a SaslException from the * underlying authentication implementation, so there is no retry from other high level (for eg,
* underlying authentication implementation, so there is no retry from * HCM or HBaseAdmin).
* other high level (for eg, HCM or HBaseAdmin).
* </p> * </p>
*
* @param currRetries retry count * @param currRetries retry count
* @param ex exception describing fail * @param ex exception describing fail
* @param user which is trying to connect * @param user which is trying to connect
@ -684,23 +668,20 @@ public class AsyncRpcChannel {
// Should reconnect // Should reconnect
return null; return null;
} else { } else {
String msg = "Couldn't setup connection for " + String msg = "Couldn't setup connection for "
UserGroupInformation.getLoginUser().getUserName() + + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
" to " + serverPrincipal;
LOG.warn(msg); LOG.warn(msg);
throw (IOException) new IOException(msg).initCause(ex); throw (IOException) new IOException(msg).initCause(ex);
} }
} else { } else {
LOG.warn("Exception encountered while connecting to " + LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
"the server : " + ex);
} }
if (ex instanceof RemoteException) { if (ex instanceof RemoteException) {
throw (RemoteException) ex; throw (RemoteException) ex;
} }
if (ex instanceof SaslException) { if (ex instanceof SaslException) {
String msg = "SASL authentication failed." + String msg = "SASL authentication failed."
" The most likely cause is missing or invalid credentials." + + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
" Consider 'kinit'.";
LOG.fatal(msg, ex); LOG.fatal(msg, ex);
throw new RuntimeException(msg, ex); throw new RuntimeException(msg, ex);
} }
@ -727,7 +708,6 @@ public class AsyncRpcChannel {
return false; return false;
} }
@Override @Override
public String toString() { public String toString() {
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;

View File

@ -17,11 +17,6 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
@ -32,27 +27,30 @@ import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/** /**
* Handles Hbase responses * Handles Hbase responses
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final AsyncRpcChannel channel; private final AsyncRpcChannel channel;
/** /**
* Constructor * Constructor
*
* @param channel on which this response handler operates * @param channel on which this response handler operates
*/ */
public AsyncServerResponseHandler(AsyncRpcChannel channel) { public AsyncServerResponseHandler(AsyncRpcChannel channel) {
this.channel = channel; this.channel = channel;
} }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { @Override
ByteBuf inBuffer = (ByteBuf) msg; protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) throws Exception {
ByteBufInputStream in = new ByteBufInputStream(inBuffer); ByteBufInputStream in = new ByteBufInputStream(inBuffer);
int totalSize = inBuffer.readableBytes(); int totalSize = inBuffer.readableBytes();
try {
// Read the header // Read the header
RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in); RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
int id = responseHeader.getCallId(); int id = responseHeader.getCallId();
@ -76,8 +74,8 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
if (responseHeader.hasException()) { if (responseHeader.hasException()) {
RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException(); RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse); RemoteException re = createRemoteException(exceptionResponse);
if (exceptionResponse.getExceptionClassName(). if (exceptionResponse.getExceptionClassName()
equals(FatalConnectionException.class.getName())) { .equals(FatalConnectionException.class.getName())) {
channel.close(re); channel.close(re);
} else { } else {
call.setFailed(re); call.setFailed(re);
@ -100,12 +98,16 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
call.setSuccess(value, cellBlockScanner); call.setSuccess(value, cellBlockScanner);
call.callStats.setResponseSizeBytes(totalSize); call.callStats.setResponseSizeBytes(totalSize);
} }
} catch (IOException e) {
// Treat this as a fatal condition and close this connection
channel.close(e);
} finally {
inBuffer.release();
} }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
channel.close(cause);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channel.close(new IOException("connection closed"));
} }
/** /**
@ -118,7 +120,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
return e.hasHostname() ? return e.hasHostname() ?
// If a hostname then add it to the RemoteWithExtrasException // If a hostname then add it to the RemoteWithExtrasException
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
e.getPort(), doNotRetry) : e.getPort(), doNotRetry)
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
} }
} }

View File

@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.http.ConnectionClosedException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -320,7 +319,7 @@ public abstract class AbstractTestIPC {
md.getOutputType().toProto(), User.getCurrent(), address, md.getOutputType().toProto(), User.getCurrent(), address,
new MetricsConnection.CallStats()); new MetricsConnection.CallStats());
fail("RPC should have failed because it exceeds max request size"); fail("RPC should have failed because it exceeds max request size");
} catch(ConnectionClosingException | ConnectionClosedException ex) { } catch(IOException ex) {
// pass // pass
} }
} finally { } finally {