HBASE-18199 Race in NettyRpcConnection may cause call stuck in BufferCallBeforeInitHandler forever

This commit is contained in:
zhangduo 2017-06-10 19:11:46 +08:00
parent 1aedc07b52
commit eca1ec3356
1 changed files with 43 additions and 18 deletions

View File

@ -71,8 +71,8 @@ class NettyRpcConnection extends RpcConnection {
private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class); private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class);
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors private static final ScheduledExecutorService RELOGIN_EXECUTOR =
.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
private final NettyRpcClient rpcClient; private final NettyRpcClient rpcClient;
@ -89,8 +89,8 @@ class NettyRpcConnection extends RpcConnection {
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
this.rpcClient = rpcClient; this.rpcClient = rpcClient;
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length) this.connectionHeaderPreamble =
.writeBytes(connectionHeaderPreamble); Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
ConnectionHeader header = getConnectionHeader(); ConnectionHeader header = getConnectionHeader();
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
@ -215,8 +215,8 @@ class NettyRpcConnection extends RpcConnection {
// add ReadTimeoutHandler to deal with server doesn't response connection header // add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side // because of the different configuration in client side and server side
p.addFirst(new ReadTimeoutHandler( p.addFirst(
RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler); p.addLast(chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() { connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override @Override
@ -281,9 +281,23 @@ class NettyRpcConnection extends RpcConnection {
}).channel(); }).channel();
} }
private void write(Channel ch, final Call call) {
ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Fail the call if we failed to write it out. This usually because the channel is
// closed. This is needed because we may shutdown the channel inside event loop and
// there may still be some pending calls in the event loop queue after us.
if (!future.isSuccess()) {
call.setException(toIOE(future.cause()));
}
}
});
}
@Override @Override
public synchronized void sendRequest(final Call call, HBaseRpcController hrc) public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
throws IOException {
if (reloginInProgress) { if (reloginInProgress) {
throw new IOException("Can not send request because relogin is in progress."); throw new IOException("Can not send request because relogin is in progress.");
} }
@ -309,18 +323,29 @@ class NettyRpcConnection extends RpcConnection {
connect(); connect();
} }
scheduleTimeoutTask(call); scheduleTimeoutTask(call);
channel.writeAndFlush(call).addListener(new ChannelFutureListener() { final Channel ch = channel;
// We must move the whole writeAndFlush call inside event loop otherwise there will be a
// race condition.
// In netty's DefaultChannelPipeline, it will find the first outbound handler in the
// current thread and then schedule a task to event loop which will start the process from
// that outbound handler. It is possible that the first handler is
// BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
// up at the same time so in the event loop thread we remove the
// BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
// write method of BufferCallBeforeInitHandler.
// This may be considered as a bug of netty, but anyway there is a work around so let's
// fix it by ourselves first.
if (ch.eventLoop().inEventLoop()) {
write(ch, call);
} else {
ch.eventLoop().execute(new Runnable() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void run() {
// Fail the call if we failed to write it out. This usually because the channel is write(ch, call);
// closed. This is needed because we may shutdown the channel inside event loop and
// there may still be some pending calls in the event loop queue after us.
if (!future.isSuccess()) {
call.setException(toIOE(future.cause()));
} }
} });
}); }
} }
} }
}); });