diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index 551d1d3fb40..b60cad10189 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -46,6 +46,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { final NettyServerRpcConnection connection; private boolean requestTooBig; + private boolean requestTooBigSent; private String requestTooBigMessage; public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) { @@ -55,8 +56,12 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (requestTooBigSent) { + in.skipBytes(in.readableBytes()); + return; + } if (requestTooBig) { - handleTooBigRequest(in); + handleTooBigRequest(ctx, in); return; } @@ -80,7 +85,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { NettyRpcServer.LOG.warn(requestTooBigMessage); if (connection.connectionHeaderRead) { - handleTooBigRequest(in); + handleTooBigRequest(ctx, in); return; } ctx.channel().close(); @@ -98,7 +103,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { out.add(in.readRetainedSlice(frameLengthInt)); } - private void handleTooBigRequest(ByteBuf in) throws IOException { + private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException { in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); in.markReaderIndex(); int preIndex = in.readerIndex(); @@ -143,6 +148,10 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder { // instead of calling reqTooBig.sendResponseIfReady() reqTooBig.param = null; connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); + in.skipBytes(in.readableBytes()); + requestTooBigSent = true; + // disable auto read as we do not care newer data from this channel any more + ctx.channel().config().setAutoRead(false); } private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {