HBASE-27273 Should stop autoRead and skip all the bytes when rpc request too big (#4679)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
(cherry picked from commit 486d19e99f
)
This commit is contained in:
parent
c5b22d973b
commit
2e7a983923
|
@ -46,6 +46,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
|||
final NettyServerRpcConnection connection;
|
||||
|
||||
private boolean requestTooBig;
|
||||
private boolean requestTooBigSent;
|
||||
private String requestTooBigMessage;
|
||||
|
||||
public NettyRpcFrameDecoder(int maxFrameLength, NettyServerRpcConnection connection) {
|
||||
|
@ -55,8 +56,12 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
|||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
if (requestTooBigSent) {
|
||||
in.skipBytes(in.readableBytes());
|
||||
return;
|
||||
}
|
||||
if (requestTooBig) {
|
||||
handleTooBigRequest(in);
|
||||
handleTooBigRequest(ctx, in);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -80,7 +85,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
|||
NettyRpcServer.LOG.warn(requestTooBigMessage);
|
||||
|
||||
if (connection.connectionHeaderRead) {
|
||||
handleTooBigRequest(in);
|
||||
handleTooBigRequest(ctx, in);
|
||||
return;
|
||||
}
|
||||
ctx.channel().close();
|
||||
|
@ -98,7 +103,7 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
|||
out.add(in.readRetainedSlice(frameLengthInt));
|
||||
}
|
||||
|
||||
private void handleTooBigRequest(ByteBuf in) throws IOException {
|
||||
private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
|
||||
in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
|
||||
in.markReaderIndex();
|
||||
int preIndex = in.readerIndex();
|
||||
|
@ -143,6 +148,10 @@ class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
|||
// instead of calling reqTooBig.sendResponseIfReady()
|
||||
reqTooBig.param = null;
|
||||
connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
|
||||
in.skipBytes(in.readableBytes());
|
||||
requestTooBigSent = true;
|
||||
// disable auto read as we do not care newer data from this channel any more
|
||||
ctx.channel().config().setAutoRead(false);
|
||||
}
|
||||
|
||||
private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue