diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java index 573ddd5a6fd..c628c3172c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java @@ -62,13 +62,16 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler { private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH, null); - private final Map id2Call = new HashMap(); + private final Map id2Call = new HashMap<>(); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg instanceof Call) { Call call = (Call) msg; id2Call.put(call.id, call); + // The call is already in track so here we set the write operation as success. + // We will fail the call directly if we can not write it out. + promise.trySuccess(); } else { ctx.write(msg, promise); } @@ -99,5 +102,4 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler { ctx.fireUserEventTriggered(evt); } } - } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 559b7f9464e..8a85580782b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.security.AsyncHBaseSaslRpcClientHandler; +import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.util.Threads; @@ -190,7 +190,7 @@ class NettyRpcConnection extends RpcConnection { Promise saslPromise = ch.eventLoop().newPromise(); ChannelHandler saslHandler; try { - saslHandler = new AsyncHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token, + saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token, serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get( "hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase())); } catch (IOException e) { @@ -205,7 +205,7 @@ class NettyRpcConnection extends RpcConnection { if (future.isSuccess()) { ChannelPipeline p = ch.pipeline(); p.remove(SaslChallengeDecoder.class); - p.remove(AsyncHBaseSaslRpcClientHandler.class); + p.remove(NettyHBaseSaslRpcClientHandler.class); established(ch); } else { final Throwable error = future.cause(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index 1cd89d8af37..5faaede082a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -204,13 +204,18 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - cleanupCalls(ctx, new IOException("Connection closed")); + if (!id2Call.isEmpty()) { + cleanupCalls(ctx, new IOException("Connection closed")); + } conn.shutdown(); + ctx.fireChannelInactive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cleanupCalls(ctx, IPCUtil.toIOE(cause)); + if (!id2Call.isEmpty()) { + cleanupCalls(ctx, IPCUtil.toIOE(cause)); + } conn.shutdown(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java similarity index 89% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java index df703dcd179..f624608e0d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java @@ -31,13 +31,13 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; /** - * Implement SASL logic for async rpc client. + * Implement SASL logic for netty rpc client. */ @InterfaceAudience.Private -public class AsyncHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { - private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClient.class); +public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { + private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClient.class); - public AsyncHBaseSaslRpcClient(AuthMethod method, Token token, + public NettyHBaseSaslRpcClient(AuthMethod method, Token token, String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { super(method, token, serverPrincipal, fallbackAllowed, rpcProtection); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java similarity index 85% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java index bccfa300ea5..50609b4b3e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java @@ -34,29 +34,29 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; /** - * Implement SASL logic for async rpc client. + * Implement SASL logic for netty rpc client. */ @InterfaceAudience.Private -public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler { +public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler { - private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClientHandler.class); + private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClientHandler.class); private final Promise saslPromise; private final UserGroupInformation ugi; - private final AsyncHBaseSaslRpcClient saslRpcClient; + private final NettyHBaseSaslRpcClient saslRpcClient; /** * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to * simple. */ - public AsyncHBaseSaslRpcClientHandler(Promise saslPromise, UserGroupInformation ugi, + public NettyHBaseSaslRpcClientHandler(Promise saslPromise, UserGroupInformation ugi, AuthMethod method, Token token, String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { this.saslPromise = saslPromise; this.ugi = ugi; - this.saslRpcClient = new AsyncHBaseSaslRpcClient(method, token, serverPrincipal, + this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal, fallbackAllowed, rpcProtection); } @@ -103,9 +103,9 @@ public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler< if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { saslRpcClient.dispose(); if (saslRpcClient.fallbackAllowed) { - saslPromise.setSuccess(false); + saslPromise.trySuccess(false); } else { - saslPromise.setFailure(new FallbackDisallowedException()); + saslPromise.tryFailure(new FallbackDisallowedException()); } return; } @@ -127,9 +127,16 @@ public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler< tryComplete(ctx); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslRpcClient.dispose(); + saslPromise.tryFailure(new IOException("Connection closed")); + ctx.fireChannelInactive(); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { saslRpcClient.dispose(); - saslPromise.setFailure(cause); + saslPromise.tryFailure(cause); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java index c2faf913379..e631478940f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java @@ -42,6 +42,7 @@ public class SaslUnwrapHandler extends SimpleChannelInboundHandler { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { SaslUtil.safeDispose(saslClient); + ctx.fireChannelInactive(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java index fefb4f8d6ec..14ecf2e5e3c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java @@ -26,6 +26,8 @@ import io.netty.channel.CoalescingBufferQueue; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.PromiseCombiner; +import java.io.IOException; + import javax.security.sasl.SaslClient; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -40,6 +42,10 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { private CoalescingBufferQueue queue; + public SaslWrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { queue = new CoalescingBufferQueue(ctx.channel()); @@ -55,29 +61,26 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { } } - public SaslWrapHandler(SaslClient saslClient) { - this.saslClient = saslClient; - } - @Override public void flush(ChannelHandlerContext ctx) throws Exception { + if (queue.isEmpty()) { + return; + } ByteBuf buf = null; try { - if (!queue.isEmpty()) { - ChannelPromise promise = ctx.newPromise(); - int readableBytes = queue.readableBytes(); - buf = queue.remove(readableBytes, promise); - byte[] bytes = new byte[readableBytes]; - buf.readBytes(bytes); - byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); - ChannelPromise lenPromise = ctx.newPromise(); - ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); - ChannelPromise contentPromise = ctx.newPromise(); - ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); - PromiseCombiner combiner = new PromiseCombiner(); - combiner.addAll(lenPromise, contentPromise); - combiner.finish(promise); - } + ChannelPromise promise = ctx.newPromise(); + int readableBytes = queue.readableBytes(); + buf = queue.remove(readableBytes, promise); + byte[] bytes = new byte[readableBytes]; + buf.readBytes(bytes); + byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); + ChannelPromise lenPromise = ctx.newPromise(); + ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); + ChannelPromise contentPromise = ctx.newPromise(); + ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); + PromiseCombiner combiner = new PromiseCombiner(); + combiner.addAll(lenPromise, contentPromise); + combiner.finish(promise); ctx.flush(); } finally { if (buf != null) { @@ -88,6 +91,9 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - queue.releaseAndFailAll(new Throwable("Closed")); + if (!queue.isEmpty()) { + queue.releaseAndFailAll(new IOException("Connection closed")); + } + ctx.close(promise); } }