HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request (#4676)

Signed-off-by: Balazs Meszaros <meszibalu@apache.org>
(cherry picked from commit fb529e2352)

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
This commit is contained in:
Duo Zhang 2022-08-04 22:31:58 +08:00
parent e93ff10b01
commit 1094b154ab
2 changed files with 35 additions and 18 deletions

View File

@ -33,6 +33,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
@InterfaceAudience.Private
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
static final String NAME = "BufferCall";
private enum BufferCallAction {
FLUSH,
FAIL
@ -77,6 +79,11 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
}
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// do not flush anything out
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof BufferCallEvent) {

View File

@ -48,7 +48,7 @@ import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@ -152,14 +152,14 @@ class NettyRpcConnection extends RpcConnection {
private void established(Channel ch) throws IOException {
assert eventLoop.inEventLoop();
ChannelPipeline p = ch.pipeline();
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
p.addBefore(addBeforeHandler, null,
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
p.addBefore(addBeforeHandler, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
p.fireUserEventTriggered(BufferCallEvent.success());
ch.pipeline()
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
.fireUserEventTriggered(BufferCallEvent.success());
}
private boolean reloginInProgress;
@ -212,7 +212,8 @@ class NettyRpcConnection extends RpcConnection {
failInit(ch, e);
return;
}
ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
saslPromise.addListener(new FutureListener<Boolean>() {
@Override
@ -226,20 +227,22 @@ class NettyRpcConnection extends RpcConnection {
if (saslHandler.isNeedProcessConnectionHeader()) {
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
// create the handler to handle the connection header
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
connectionHeaderPromise, conf, connectionHeaderWithLength);
NettyHBaseRpcConnectionHeaderHandler chHandler =
new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
connectionHeaderWithLength);
// add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side
p.addFirst(
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler);
final String readTimeoutHandlerName = "ReadTimeout";
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS))
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(ReadTimeoutHandler.class);
p.remove(readTimeoutHandlerName);
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
// sent it already
@ -273,8 +276,15 @@ class NettyRpcConnection extends RpcConnection {
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
new BufferCallBeforeInitHandler());
}
}).localAddress(rpcClient.localAddr).remoteAddress(remoteId.address).connect()
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {