From 9fa44a8126bd23d6dff5db2091bc5279fabc7e6e Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 24 Mar 2016 10:24:34 +0800 Subject: [PATCH] HBASE-15520 Fix broken TestAsyncIPC Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java --- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 258 ++++++++---------- .../hbase/ipc/AsyncServerResponseHandler.java | 128 ++++----- .../hadoop/hbase/ipc/AbstractTestIPC.java | 3 +- 3 files changed, 185 insertions(+), 204 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 9fe2cf67fd8..37ed4135494 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -17,18 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; - import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -53,6 +41,7 @@ import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.TracingProtos; import org.apache.hadoop.hbase.security.AuthMethod; @@ -76,6 +65,18 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.RpcCallback; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; + /** * Netty RPC channel */ @@ -85,12 +86,12 @@ public class AsyncRpcChannel { private static final int MAX_SASL_RETRIES = 5; - protected final static Map> tokenHandlers = new HashMap<>(); + protected final static Map> TOKEN_HANDDLERS + = new HashMap<>(); static { - tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, - new AuthenticationTokenSelector()); + TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN, + new AuthenticationTokenSelector()); } final AsyncRpcClient client; @@ -113,7 +114,6 @@ public class AsyncRpcChannel { private Token token; private String serverPrincipal; - // NOTE: closed and connected flags below are only changed when a lock on pendingCalls private final Map pendingCalls = new HashMap(); private boolean connected = false; @@ -130,15 +130,14 @@ public class AsyncRpcChannel { /** * Constructor for netty RPC channel - * * @param bootstrap to construct channel on - * @param client to connect with + * @param client to connect with * @param ticket of user which uses connection * @param serviceName name of service to connect to * @param address to connect to */ - public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String - serviceName, InetSocketAddress address) { + public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, + String serviceName, InetSocketAddress address) { this.client = client; this.ticket = ticket; @@ -147,16 +146,12 @@ public class AsyncRpcChannel { this.channel = connect(bootstrap).channel(); - name = ("IPC Client (" + channel.hashCode() + ") to " + - address.toString() + - ((ticket == null) ? - " from unknown user" : - (" from " + ticket.getName()))); + name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString() + + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName()))); } /** * Connect to channel - * * @param bootstrap to connect to * @return future of connection */ @@ -215,12 +210,11 @@ public class AsyncRpcChannel { /** * Start HBase connection - * * @param ch channel to start connection on */ private void startHBaseConnection(Channel ch) { - ch.pipeline() - .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); ch.pipeline().addLast(new AsyncServerResponseHandler(this)); try { writeChannelHeader(ch).addListener(new GenericFutureListener() { @@ -254,7 +248,8 @@ public class AsyncRpcChannel { private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket, final Bootstrap bootstrap) throws IOException { return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal, - client.fallbackAllowed, client.conf.get("hbase.rpc.protection", + client.fallbackAllowed, + client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()), new SaslClientHandler.SaslExceptionHandler() { @Override @@ -312,9 +307,8 @@ public class AsyncRpcChannel { public Promise callMethod(final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller, final Message request, final Message responsePrototype, MetricsConnection.CallStats callStats) { - final AsyncCall call = - new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request, - controller, responsePrototype, callStats); + final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), + method, request, controller, responsePrototype, callStats); controller.notifyOnCancel(new RpcCallback() { @Override public void run(Object parameter) { @@ -340,9 +334,7 @@ public class AsyncRpcChannel { pendingCalls.put(call.id, call); // Add timeout for cleanup if none is present if (cleanupTimer == null && call.getRpcTimeout() > 0) { - cleanupTimer = - client.newTimeout(timeoutTask, call.getRpcTimeout(), - TimeUnit.MILLISECONDS); + cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); } if (!connected) { return call; @@ -360,14 +352,13 @@ public class AsyncRpcChannel { /** * Write the channel header - * * @param channel to write to * @return future of write * @throws java.io.IOException on failure to write */ private ChannelFuture writeChannelHeader(Channel channel) throws IOException { - RPCProtos.ConnectionHeader.Builder headerBuilder = - RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName); + RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder() + .setServiceName(serviceName); RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod); if (userInfoPB != null) { @@ -384,7 +375,6 @@ public class AsyncRpcChannel { headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo()); RPCProtos.ConnectionHeader header = headerBuilder.build(); - int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header); ByteBuf b = channel.alloc().directBuffer(totalSize); @@ -397,20 +387,19 @@ public class AsyncRpcChannel { /** * Write request to channel - * - * @param call to write + * @param call to write */ private void writeRequest(final AsyncCall call) { try { final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader .newBuilder(); - requestHeaderBuilder.setCallId(call.id) - .setMethodName(call.method.getName()).setRequestParam(call.param != null); + requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName()) + .setRequestParam(call.param != null); if (Trace.isTracing()) { Span s = Trace.currentSpan(); - requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder(). - setParentId(s.getSpanId()).setTraceId(s.getTraceId())); + requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder() + .setParentId(s.getSpanId()).setTraceId(s.getTraceId())); } ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner()); @@ -420,7 +409,7 @@ public class AsyncRpcChannel { cellBlockBuilder.setLength(cellBlock.limit()); requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); } - // Only pass priority if there one. Let zero be same as no priority. + // Only pass priority if there one. Let zero be same as no priority. if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { requestHeaderBuilder.setPriority(call.controller.getPriority()); } @@ -433,7 +422,7 @@ public class AsyncRpcChannel { } ByteBuf b = channel.alloc().directBuffer(4 + totalSize); - try(ByteBufOutputStream out = new ByteBufOutputStream(b)) { + try (ByteBufOutputStream out = new ByteBufOutputStream(b)) { call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock)); } @@ -445,7 +434,6 @@ public class AsyncRpcChannel { /** * Set up server authorization - * * @throws java.io.IOException if auth setup failed */ private void setupAuthorization() throws IOException { @@ -456,10 +444,10 @@ public class AsyncRpcChannel { if (useSasl && securityInfo != null) { AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind(); if (tokenKind != null) { - TokenSelector tokenSelector = tokenHandlers.get(tokenKind); + TokenSelector tokenSelector = TOKEN_HANDDLERS.get(tokenKind); if (tokenSelector != null) { - token = tokenSelector - .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens()); + token = tokenSelector.selectToken(new Text(client.clusterId), + ticket.getUGI().getTokens()); } else if (LOG.isDebugEnabled()) { LOG.debug("No token selector found for type " + tokenKind); } @@ -469,7 +457,7 @@ public class AsyncRpcChannel { throw new IOException("Can't obtain server Kerberos config key from SecurityInfo"); } this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey), - address.getAddress().getCanonicalHostName().toLowerCase()); + address.getAddress().getCanonicalHostName().toLowerCase()); if (LOG.isDebugEnabled()) { LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is " + serverPrincipal); @@ -485,16 +473,15 @@ public class AsyncRpcChannel { } if (LOG.isDebugEnabled()) { - LOG.debug("Use " + authMethod + " authentication for service " + serviceName + - ", sasl=" + useSasl); + LOG.debug( + "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl); } reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000); } /** * Build the user information - * - * @param ugi User Group Information + * @param ugi User Group Information * @param authMethod Authorization method * @return UserInformation protobuf */ @@ -508,7 +495,7 @@ public class AsyncRpcChannel { // Send effective user for Kerberos auth userInfoPB.setEffectiveUser(ugi.getUserName()); } else if (authMethod == AuthMethod.SIMPLE) { - //Send both effective user and real user for simple auth + // Send both effective user and real user for simple auth userInfoPB.setEffectiveUser(ugi.getUserName()); if (ugi.getRealUser() != null) { userInfoPB.setRealUser(ugi.getRealUser().getUserName()); @@ -519,8 +506,7 @@ public class AsyncRpcChannel { /** * Create connection preamble - * - * @param byteBuf to write to + * @param byteBuf to write to * @param authMethod to write */ private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) { @@ -529,53 +515,61 @@ public class AsyncRpcChannel { byteBuf.writeByte(authMethod.code); } + private void close0(Throwable e) { + List toCleanup; + synchronized (pendingCalls) { + if (closed) { + return; + } + closed = true; + toCleanup = new ArrayList(pendingCalls.values()); + pendingCalls.clear(); + } + IOException closeException = null; + if (e != null) { + if (e instanceof IOException) { + closeException = (IOException) e; + } else { + closeException = new IOException(e); + } + } + // log the info + if (LOG.isDebugEnabled() && closeException != null) { + LOG.debug(name + ": closing ipc connection to " + address, closeException); + } + if (cleanupTimer != null) { + cleanupTimer.cancel(); + cleanupTimer = null; + } + for (AsyncCall call : toCleanup) { + call.setFailed(closeException != null ? closeException + : new ConnectionClosingException( + "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); + } + channel.disconnect().addListener(ChannelFutureListener.CLOSE); + if (LOG.isDebugEnabled()) { + LOG.debug(name + ": closed"); + } + } + /** * Close connection - * * @param e exception on close */ public void close(final Throwable e) { client.removeConnection(this); // Move closing from the requesting thread to the channel thread - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - List toCleanup; - synchronized (pendingCalls) { - if (closed) { - return; - } - closed = true; - toCleanup = new ArrayList(pendingCalls.values()); - pendingCalls.clear(); + if (channel.eventLoop().inEventLoop()) { + close0(e); + } else { + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + close0(e); } - IOException closeException = null; - if (e != null) { - if (e instanceof IOException) { - closeException = (IOException) e; - } else { - closeException = new IOException(e); - } - } - // log the info - if (LOG.isDebugEnabled() && closeException != null) { - LOG.debug(name + ": closing ipc connection to " + address, closeException); - } - if (cleanupTimer != null) { - cleanupTimer.cancel(); - cleanupTimer = null; - } - for (AsyncCall call : toCleanup) { - call.setFailed(closeException != null ? closeException : new ConnectionClosingException( - "Call id=" + call.id + " on server " + address + " aborted: connection is closing")); - } - channel.disconnect().addListener(ChannelFutureListener.CLOSE); - if (LOG.isDebugEnabled()) { - LOG.debug(name + ": closed"); - } - } - }); + }); + } } /** @@ -601,9 +595,7 @@ public class AsyncRpcChannel { } } if (nextCleanupTaskDelay > 0) { - cleanupTimer = - client.newTimeout(timeoutTask, nextCleanupTaskDelay, - TimeUnit.MILLISECONDS); + cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); } else { cleanupTimer = null; } @@ -616,7 +608,6 @@ public class AsyncRpcChannel { /** * Check if the connection is alive - * * @return true if alive */ public boolean isAlive() { @@ -625,7 +616,6 @@ public class AsyncRpcChannel { /** * Check if user should authenticate over Kerberos - * * @return true if should be authenticated over Kerberos * @throws java.io.IOException on failure of check */ @@ -633,37 +623,31 @@ public class AsyncRpcChannel { UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); UserGroupInformation realUser = currentUser.getRealUser(); - return authMethod == AuthMethod.KERBEROS && - loginUser != null && - //Make sure user logged in using Kerberos either keytab or TGT - loginUser.hasKerberosCredentials() && - // relogin only in case it is the login user (e.g. JT) - // or superuser (like oozie). - (loginUser.equals(currentUser) || loginUser.equals(realUser)); + return authMethod == AuthMethod.KERBEROS && loginUser != null && + // Make sure user logged in using Kerberos either keytab or TGT + loginUser.hasKerberosCredentials() && + // relogin only in case it is the login user (e.g. JT) + // or superuser (like oozie). + (loginUser.equals(currentUser) || loginUser.equals(realUser)); } /** - * If multiple clients with the same principal try to connect - * to the same server at the same time, the server assumes a - * replay attack is in progress. This is a feature of kerberos. - * In order to work around this, what is done is that the client - * backs off randomly and tries to initiate the connection - * again. - * The other problem is to do with ticket expiry. To handle that, - * a relogin is attempted. + * If multiple clients with the same principal try to connect to the same server at the same time, + * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to + * work around this, what is done is that the client backs off randomly and tries to initiate the + * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is + * attempted. *

- * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} - * method. In case when the user doesn't have valid credentials, we don't - * need to retry (from cache or ticket). In such cases, it is prudent to - * throw a runtime exception when we receive a SaslException from the - * underlying authentication implementation, so there is no retry from - * other high level (for eg, HCM or HBaseAdmin). + * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the + * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such + * cases, it is prudent to throw a runtime exception when we receive a SaslException from the + * underlying authentication implementation, so there is no retry from other high level (for eg, + * HCM or HBaseAdmin). *

- * * @param currRetries retry count - * @param ex exception describing fail - * @param user which is trying to connect - * @throws java.io.IOException if IO fail + * @param ex exception describing fail + * @param user which is trying to connect + * @throws java.io.IOException if IO fail * @throws InterruptedException if thread is interrupted */ private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, @@ -674,7 +658,7 @@ public class AsyncRpcChannel { if (shouldAuthenticateOverKrb()) { if (currRetries < MAX_SASL_RETRIES) { LOG.debug("Exception encountered while connecting to the server : " + ex); - //try re-login + // try re-login if (UserGroupInformation.isLoginKeytabBased()) { UserGroupInformation.getLoginUser().reloginFromKeytab(); } else { @@ -684,23 +668,20 @@ public class AsyncRpcChannel { // Should reconnect return null; } else { - String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + - " to " + serverPrincipal; + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; LOG.warn(msg); throw (IOException) new IOException(msg).initCause(ex); } } else { - LOG.warn("Exception encountered while connecting to " + - "the server : " + ex); + LOG.warn("Exception encountered while connecting to " + "the server : " + ex); } if (ex instanceof RemoteException) { throw (RemoteException) ex; } if (ex instanceof SaslException) { - String msg = "SASL authentication failed." + - " The most likely cause is missing or invalid credentials." + - " Consider 'kinit'."; + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; LOG.fatal(msg, ex); throw new RuntimeException(msg, ex); } @@ -727,7 +708,6 @@ public class AsyncRpcChannel { return false; } - @Override public String toString() { return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java index 8f6c85b06bd..e0c7586477c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; - import java.io.IOException; import org.apache.hadoop.hbase.CellScanner; @@ -32,80 +27,87 @@ import org.apache.hadoop.ipc.RemoteException; import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + /** * Handles Hbase responses */ @InterfaceAudience.Private -public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { +public class AsyncServerResponseHandler extends SimpleChannelInboundHandler { private final AsyncRpcChannel channel; /** * Constructor - * * @param channel on which this response handler operates */ public AsyncServerResponseHandler(AsyncRpcChannel channel) { this.channel = channel; } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf inBuffer = (ByteBuf) msg; + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) throws Exception { ByteBufInputStream in = new ByteBufInputStream(inBuffer); int totalSize = inBuffer.readableBytes(); - try { - // Read the header - RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in); - int id = responseHeader.getCallId(); - AsyncCall call = channel.removePendingCall(id); - if (call == null) { - // So we got a response for which we have no corresponding 'call' here on the client-side. - // We probably timed out waiting, cleaned up all references, and now the server decides - // to return a response. There is nothing we can do w/ the response at this stage. Clean - // out the wire of the response so its out of the way and we can get other responses on - // this connection. - int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); - int whatIsLeftToRead = totalSize - readSoFar; + // Read the header + RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in); + int id = responseHeader.getCallId(); + AsyncCall call = channel.removePendingCall(id); + if (call == null) { + // So we got a response for which we have no corresponding 'call' here on the client-side. + // We probably timed out waiting, cleaned up all references, and now the server decides + // to return a response. There is nothing we can do w/ the response at this stage. Clean + // out the wire of the response so its out of the way and we can get other responses on + // this connection. + int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); + int whatIsLeftToRead = totalSize - readSoFar; - // This is done through a Netty ByteBuf which has different behavior than InputStream. - // It does not return number of bytes read but will update pointer internally and throws an - // exception when too many bytes are to be skipped. - inBuffer.skipBytes(whatIsLeftToRead); - return; - } - - if (responseHeader.hasException()) { - RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException(); - RemoteException re = createRemoteException(exceptionResponse); - if (exceptionResponse.getExceptionClassName(). - equals(FatalConnectionException.class.getName())) { - channel.close(re); - } else { - call.setFailed(re); - } - } else { - Message value = null; - // Call may be null because it may have timedout and been cleaned up on this side already - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - ProtobufUtil.mergeDelimitedFrom(builder, in); - value = builder.build(); - } - CellScanner cellBlockScanner = null; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - byte[] cellBlock = new byte[size]; - inBuffer.readBytes(cellBlock, 0, cellBlock.length); - cellBlockScanner = channel.client.createCellScanner(cellBlock); - } - call.setSuccess(value, cellBlockScanner); - call.callStats.setResponseSizeBytes(totalSize); - } - } catch (IOException e) { - // Treat this as a fatal condition and close this connection - channel.close(e); - } finally { - inBuffer.release(); + // This is done through a Netty ByteBuf which has different behavior than InputStream. + // It does not return number of bytes read but will update pointer internally and throws an + // exception when too many bytes are to be skipped. + inBuffer.skipBytes(whatIsLeftToRead); + return; } + + if (responseHeader.hasException()) { + RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException(); + RemoteException re = createRemoteException(exceptionResponse); + if (exceptionResponse.getExceptionClassName() + .equals(FatalConnectionException.class.getName())) { + channel.close(re); + } else { + call.setFailed(re); + } + } else { + Message value = null; + // Call may be null because it may have timedout and been cleaned up on this side already + if (call.responseDefaultType != null) { + Message.Builder builder = call.responseDefaultType.newBuilderForType(); + ProtobufUtil.mergeDelimitedFrom(builder, in); + value = builder.build(); + } + CellScanner cellBlockScanner = null; + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + byte[] cellBlock = new byte[size]; + inBuffer.readBytes(cellBlock, 0, cellBlock.length); + cellBlockScanner = channel.client.createCellScanner(cellBlock); + } + call.setSuccess(value, cellBlockScanner); + call.callStats.setResponseSizeBytes(totalSize); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + channel.close(cause); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + channel.close(new IOException("connection closed")); } /** @@ -118,7 +120,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter { return e.hasHostname() ? // If a hostname then add it to the RemoteWithExtrasException new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), - e.getPort(), doNotRetry) : - new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); + e.getPort(), doNotRetry) + : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index e8da9eec5f4..69c8fe246f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; -import org.apache.http.ConnectionClosedException; import org.junit.Assert; import org.junit.Test; @@ -320,7 +319,7 @@ public abstract class AbstractTestIPC { md.getOutputType().toProto(), User.getCurrent(), address, new MetricsConnection.CallStats()); fail("RPC should have failed because it exceeds max request size"); - } catch(ConnectionClosingException | ConnectionClosedException ex) { + } catch(IOException ex) { // pass } } finally {