From 2e7a98392375b27fef80fad6e20c30a652272df4 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 6 Aug 2022 20:08:07 +0800 Subject: [PATCH] HBASE-27273 Should stop autoRead and skip all the bytes when rpc request too big (#4679) Signed-off-by: Xiaolin Ha (cherry picked from commit 486d19e99ff4370bc60e0db235508198c84a00e3) --- .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index 551d1d3fb40..b60cad10189 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -46,6 +46,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { final NettyServerRpcConnection connection; private boolean requestTooBig; + private boolean requestTooBigSent; private String requestTooBigMessage; public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) { @@ -55,8 +56,12 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (requestTooBigSent) { + in.skipBytes(in.readableBytes()); + return; + } if (requestTooBig) { - handleTooBigRequest(in); + handleTooBigRequest(ctx, in); return; } @@ -80,7 +85,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { NettyRpcServer.LOG.warn(requestTooBigMessage); if (connection.connectionHeaderRead) { - handleTooBigRequest(in); + handleTooBigRequest(ctx, in); return; } ctx.channel().close(); @@ -98,7 +103,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { out.add(in.readRetainedSlice(frameLengthInt)); } - private void handleTooBigRequest(ByteBuf in) throws IOException { + private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException { in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); in.markReaderIndex(); int preIndex = in.readerIndex(); @@ -143,6 +148,10 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { // instead of calling reqTooBig.sendResponseIfReady() reqTooBig.param = null; connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); + in.skipBytes(in.readableBytes()); + requestTooBigSent = true; + // disable auto read as we do not care newer data from this channel any more + ctx.channel().config().setAutoRead(false); } private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {