HBASE-27273 Should stop autoRead and skip all the bytes when rpc request too big (#4679)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
(cherry picked from commit 486d19e99f
)
This commit is contained in:
parent
a7f8dde815
commit
0a36504848
|
@ -44,6 +44,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
||||||
|
|
||||||
private final int maxFrameLength;
|
private final int maxFrameLength;
|
||||||
private boolean requestTooBig;
|
private boolean requestTooBig;
|
||||||
|
private boolean requestTooBigSent;
|
||||||
private String requestTooBigMessage;
|
private String requestTooBigMessage;
|
||||||
|
|
||||||
public NettyRpcFrameDecoder(int maxFrameLength) {
|
public NettyRpcFrameDecoder(int maxFrameLength) {
|
||||||
|
@ -58,8 +59,12 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
|
if (requestTooBigSent) {
|
||||||
|
in.skipBytes(in.readableBytes());
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (requestTooBig) {
|
if (requestTooBig) {
|
||||||
handleTooBigRequest(in);
|
handleTooBigRequest(ctx, in);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +88,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
||||||
NettyRpcServer.LOG.warn(requestTooBigMessage);
|
NettyRpcServer.LOG.warn(requestTooBigMessage);
|
||||||
|
|
||||||
if (connection.connectionHeaderRead) {
|
if (connection.connectionHeaderRead) {
|
||||||
handleTooBigRequest(in);
|
handleTooBigRequest(ctx, in);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ctx.channel().close();
|
ctx.channel().close();
|
||||||
|
@ -101,7 +106,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
||||||
out.add(in.readRetainedSlice(frameLengthInt));
|
out.add(in.readRetainedSlice(frameLengthInt));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleTooBigRequest(ByteBuf in) throws IOException {
|
private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException {
|
||||||
in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
|
in.skipBytes(FRAME_LENGTH_FIELD_LENGTH);
|
||||||
in.markReaderIndex();
|
in.markReaderIndex();
|
||||||
int preIndex = in.readerIndex();
|
int preIndex = in.readerIndex();
|
||||||
|
@ -146,6 +151,10 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder {
|
||||||
// instead of calling reqTooBig.sendResponseIfReady()
|
// instead of calling reqTooBig.sendResponseIfReady()
|
||||||
reqTooBig.param = null;
|
reqTooBig.param = null;
|
||||||
connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
|
connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
in.skipBytes(in.readableBytes());
|
||||||
|
requestTooBigSent = true;
|
||||||
|
// disable auto read as we do not care newer data from this channel any more
|
||||||
|
ctx.channel().config().setAutoRead(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {
|
private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue