HBASE-18199 Race in NettyRpcConnection may cause call stuck in BufferCallBeforeInitHandler forever

This commit is contained in:
zhangduo 2017-06-10 19:11:46 +08:00
parent eb2dc5d2a5
commit ea64dbef7f
1 changed files with 43 additions and 18 deletions

View File

@ -71,8 +71,8 @@ class NettyRpcConnection extends RpcConnection {
private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class);
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
private static final ScheduledExecutorService RELOGIN_EXECUTOR =
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
private final NettyRpcClient rpcClient;
@ -89,8 +89,8 @@ class NettyRpcConnection extends RpcConnection {
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
this.rpcClient = rpcClient;
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length)
.writeBytes(connectionHeaderPreamble);
this.connectionHeaderPreamble =
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
ConnectionHeader header = getConnectionHeader();
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
@ -215,8 +215,8 @@ class NettyRpcConnection extends RpcConnection {
// add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side
p.addFirst(new ReadTimeoutHandler(
RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addFirst(
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override
@ -281,9 +281,23 @@ class NettyRpcConnection extends RpcConnection {
}).channel();
}
private void write(Channel ch, final Call call) {
ch.writeAndFlush(call).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Fail the call if we failed to write it out. This usually because the channel is
// closed. This is needed because we may shutdown the channel inside event loop and
// there may still be some pending calls in the event loop queue after us.
if (!future.isSuccess()) {
call.setException(toIOE(future.cause()));
}
}
});
}
@Override
public synchronized void sendRequest(final Call call, HBaseRpcController hrc)
throws IOException {
public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException {
if (reloginInProgress) {
throw new IOException("Can not send request because relogin is in progress.");
}
@ -309,18 +323,29 @@ class NettyRpcConnection extends RpcConnection {
connect();
}
scheduleTimeoutTask(call);
channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
final Channel ch = channel;
// We must move the whole writeAndFlush call inside event loop otherwise there will be a
// race condition.
// In netty's DefaultChannelPipeline, it will find the first outbound handler in the
// current thread and then schedule a task to event loop which will start the process from
// that outbound handler. It is possible that the first handler is
// BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set
// up at the same time so in the event loop thread we remove the
// BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the
// write method of BufferCallBeforeInitHandler.
// This may be considered as a bug of netty, but anyway there is a work around so let's
// fix it by ourselves first.
if (ch.eventLoop().inEventLoop()) {
write(ch, call);
} else {
ch.eventLoop().execute(new Runnable() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Fail the call if we failed to write it out. This usually because the channel is
// closed. This is needed because we may shutdown the channel inside event loop and
// there may still be some pending calls in the event loop queue after us.
if (!future.isSuccess()) {
call.setException(toIOE(future.cause()));
@Override
public void run() {
write(ch, call);
}
}
});
});
}
}
}
});