diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 47d7234621e..204b812f17d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -71,8 +71,8 @@ class NettyRpcConnection extends RpcConnection { private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class); - private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors - .newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); + private static final ScheduledExecutorService RELOGIN_EXECUTOR = + Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); private final NettyRpcClient rpcClient; @@ -89,8 +89,8 @@ class NettyRpcConnection extends RpcConnection { rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); this.rpcClient = rpcClient; byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); - this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length) - .writeBytes(connectionHeaderPreamble); + this.connectionHeaderPreamble = + Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); ConnectionHeader header = getConnectionHeader(); this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); @@ -215,8 +215,8 @@ class NettyRpcConnection extends RpcConnection { // add ReadTimeoutHandler to deal with server doesn't response connection header // because of the different configuration in client side and server side - p.addFirst(new ReadTimeoutHandler( - RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); + p.addFirst( + new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); p.addLast(chHandler); connectionHeaderPromise.addListener(new FutureListener() { @Override @@ -281,9 +281,23 @@ class NettyRpcConnection extends RpcConnection { }).channel(); } + private void write(Channel ch, final Call call) { + ch.writeAndFlush(call).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Fail the call if we failed to write it out. This usually because the channel is + // closed. This is needed because we may shutdown the channel inside event loop and + // there may still be some pending calls in the event loop queue after us. + if (!future.isSuccess()) { + call.setException(toIOE(future.cause())); + } + } + }); + } + @Override - public synchronized void sendRequest(final Call call, HBaseRpcController hrc) - throws IOException { + public synchronized void sendRequest(final Call call, HBaseRpcController hrc) throws IOException { if (reloginInProgress) { throw new IOException("Can not send request because relogin is in progress."); } @@ -309,18 +323,29 @@ class NettyRpcConnection extends RpcConnection { connect(); } scheduleTimeoutTask(call); - channel.writeAndFlush(call).addListener(new ChannelFutureListener() { + final Channel ch = channel; + // We must move the whole writeAndFlush call inside event loop otherwise there will be a + // race condition. + // In netty's DefaultChannelPipeline, it will find the first outbound handler in the + // current thread and then schedule a task to event loop which will start the process from + // that outbound handler. It is possible that the first handler is + // BufferCallBeforeInitHandler when we call writeAndFlush here, but the connection is set + // up at the same time so in the event loop thread we remove the + // BufferCallBeforeInitHandler, and then our writeAndFlush task comes, still calls the + // write method of BufferCallBeforeInitHandler. + // This may be considered as a bug of netty, but anyway there is a work around so let's + // fix it by ourselves first. + if (ch.eventLoop().inEventLoop()) { + write(ch, call); + } else { + ch.eventLoop().execute(new Runnable() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // Fail the call if we failed to write it out. This usually because the channel is - // closed. This is needed because we may shutdown the channel inside event loop and - // there may still be some pending calls in the event loop queue after us. - if (!future.isSuccess()) { - call.setException(toIOE(future.cause())); + @Override + public void run() { + write(ch, call); } - } - }); + }); + } } } });