HBASE-27273 Should stop autoRead and skip all the bytes when rpc request too big (#4679)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2022-08-06 20:08:07 +08:00 committed by GitHub
parent 2ac5578730
commit 486d19e99f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 3 deletions

View File

@ -46,6 +46,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
final NettyServerRpcConnection connection; final NettyServerRpcConnection connection;
private boolean requestTooBig; private boolean requestTooBig;
private boolean requestTooBigSent;
private String requestTooBigMessage; private String requestTooBigMessage;
public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) { public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) {
@ -55,8 +56,12 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (requestTooBigSent) {
in.skipBytes(in.readableBytes());
return;
}
if (requestTooBig) { if (requestTooBig) {
handleTooBigRequest(in); handleTooBigRequest(ctx, in);
return; return;
} }
@ -80,7 +85,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
NettyRpcServer.LOG.warn(requestTooBigMessage); NettyRpcServer.LOG.warn(requestTooBigMessage);
if (connection.connectionHeaderRead) { if (connection.connectionHeaderRead) {
handleTooBigRequest(in); handleTooBigRequest(ctx, in);
return; return;
} }
ctx.channel().close(); ctx.channel().close();
@ -98,7 +103,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
out.add(in.readRetainedSlice(frameLengthInt)); out.add(in.readRetainedSlice(frameLengthInt));
} }
private void handleTooBigRequest(ByteBuf in) throws IOException { private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
in.markReaderIndex(); in.markReaderIndex();
int preIndex = in.readerIndex(); int preIndex = in.readerIndex();
@ -143,6 +148,10 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
// instead of calling reqTooBig.sendResponseIfReady() // instead of calling reqTooBig.sendResponseIfReady()
reqTooBig.param = null; reqTooBig.param = null;
connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
in.skipBytes(in.readableBytes());
requestTooBigSent = true;
// disable auto read as we do not care newer data from this channel any more
ctx.channel().config().setAutoRead(false);
} }
private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException { private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {