HBASE-16654 Better handle channelInactive and close for netty rpc client

This commit is contained in:
zhangduo 2016-09-19 20:52:46 +08:00
parent c67983ebf8
commit 5568929dd2
7 changed files with 61 additions and 40 deletions

View File

@ -62,13 +62,16 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH,
null);
private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();
private final Map<Integer, Call> id2Call = new HashMap<>();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Call) {
Call call = (Call) msg;
id2Call.put(call.id, call);
// The call is already in track so here we set the write operation as success.
// We will fail the call directly if we can not write it out.
promise.trySuccess();
} else {
ctx.write(msg, promise);
}
@ -99,5 +102,4 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
ctx.fireUserEventTriggered(evt);
}
}
}

View File

@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.security.AsyncHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.util.Threads;
@ -190,7 +190,7 @@ class NettyRpcConnection extends RpcConnection {
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
ChannelHandler saslHandler;
try {
saslHandler = new AsyncHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get(
"hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
} catch (IOException e) {
@ -205,7 +205,7 @@ class NettyRpcConnection extends RpcConnection {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(SaslChallengeDecoder.class);
p.remove(AsyncHBaseSaslRpcClientHandler.class);
p.remove(NettyHBaseSaslRpcClientHandler.class);
established(ch);
} else {
final Throwable error = future.cause();

View File

@ -204,13 +204,18 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
cleanupCalls(ctx, new IOException("Connection closed"));
if (!id2Call.isEmpty()) {
cleanupCalls(ctx, new IOException("Connection closed"));
}
conn.shutdown();
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cleanupCalls(ctx, IPCUtil.toIOE(cause));
if (!id2Call.isEmpty()) {
cleanupCalls(ctx, IPCUtil.toIOE(cause));
}
conn.shutdown();
}

View File

@ -31,13 +31,13 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
/**
* Implement SASL logic for async rpc client.
* Implement SASL logic for netty rpc client.
*/
@InterfaceAudience.Private
public class AsyncHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClient.class);
public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClient.class);
public AsyncHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
public NettyHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException {
super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
}

View File

@ -34,29 +34,29 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
/**
* Implement SASL logic for async rpc client.
* Implement SASL logic for netty rpc client.
*/
@InterfaceAudience.Private
public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClientHandler.class);
private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClientHandler.class);
private final Promise<Boolean> saslPromise;
private final UserGroupInformation ugi;
private final AsyncHBaseSaslRpcClient saslRpcClient;
private final NettyHBaseSaslRpcClient saslRpcClient;
/**
* @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
* simple.
*/
public AsyncHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal,
boolean fallbackAllowed, String rpcProtection) throws IOException {
this.saslPromise = saslPromise;
this.ugi = ugi;
this.saslRpcClient = new AsyncHBaseSaslRpcClient(method, token, serverPrincipal,
this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal,
fallbackAllowed, rpcProtection);
}
@ -103,9 +103,9 @@ public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
saslRpcClient.dispose();
if (saslRpcClient.fallbackAllowed) {
saslPromise.setSuccess(false);
saslPromise.trySuccess(false);
} else {
saslPromise.setFailure(new FallbackDisallowedException());
saslPromise.tryFailure(new FallbackDisallowedException());
}
return;
}
@ -127,9 +127,16 @@ public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<
tryComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
saslRpcClient.dispose();
saslPromise.tryFailure(new IOException("Connection closed"));
ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
saslRpcClient.dispose();
saslPromise.setFailure(cause);
saslPromise.tryFailure(cause);
}
}

View File

@ -42,6 +42,7 @@ public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SaslUtil.safeDispose(saslClient);
ctx.fireChannelInactive();
}
@Override

View File

@ -26,6 +26,8 @@ import io.netty.channel.CoalescingBufferQueue;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import java.io.IOException;
import javax.security.sasl.SaslClient;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -40,6 +42,10 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
private CoalescingBufferQueue queue;
public SaslWrapHandler(SaslClient saslClient) {
this.saslClient = saslClient;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
queue = new CoalescingBufferQueue(ctx.channel());
@ -55,29 +61,26 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
}
}
public SaslWrapHandler(SaslClient saslClient) {
this.saslClient = saslClient;
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (queue.isEmpty()) {
return;
}
ByteBuf buf = null;
try {
if (!queue.isEmpty()) {
ChannelPromise promise = ctx.newPromise();
int readableBytes = queue.readableBytes();
buf = queue.remove(readableBytes, promise);
byte[] bytes = new byte[readableBytes];
buf.readBytes(bytes);
byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
ChannelPromise lenPromise = ctx.newPromise();
ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
ChannelPromise contentPromise = ctx.newPromise();
ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
PromiseCombiner combiner = new PromiseCombiner();
combiner.addAll(lenPromise, contentPromise);
combiner.finish(promise);
}
ChannelPromise promise = ctx.newPromise();
int readableBytes = queue.readableBytes();
buf = queue.remove(readableBytes, promise);
byte[] bytes = new byte[readableBytes];
buf.readBytes(bytes);
byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
ChannelPromise lenPromise = ctx.newPromise();
ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
ChannelPromise contentPromise = ctx.newPromise();
ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
PromiseCombiner combiner = new PromiseCombiner();
combiner.addAll(lenPromise, contentPromise);
combiner.finish(promise);
ctx.flush();
} finally {
if (buf != null) {
@ -88,6 +91,9 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.releaseAndFailAll(new Throwable("Closed"));
if (!queue.isEmpty()) {
queue.releaseAndFailAll(new IOException("Connection closed"));
}
ctx.close(promise);
}
}