(cherry picked from commit df4812df65
)
This commit is contained in:
parent
c5b360fd15
commit
6202348502
|
@ -26,6 +26,7 @@ import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
|
import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
|
||||||
import org.apache.hadoop.oncrpc.security.VerifierNone;
|
import org.apache.hadoop.oncrpc.security.VerifierNone;
|
||||||
|
@ -163,8 +164,16 @@ public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg)
|
public void channelRead(ChannelHandlerContext ctx, Object msg)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
RpcInfo info = (RpcInfo) msg;
|
RpcInfo info = (RpcInfo) msg;
|
||||||
|
try {
|
||||||
|
channelRead(ctx, info);
|
||||||
|
} finally {
|
||||||
|
ReferenceCountUtil.release(info.data());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void channelRead(ChannelHandlerContext ctx, RpcInfo info)
|
||||||
|
throws Exception {
|
||||||
RpcCall call = (RpcCall) info.header();
|
RpcCall call = (RpcCall) info.header();
|
||||||
|
|
||||||
SocketAddress remoteAddress = info.remoteAddress();
|
SocketAddress remoteAddress = info.remoteAddress();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(program + " procedure #" + call.getProcedure());
|
LOG.trace(program + " procedure #" + call.getProcedure());
|
||||||
|
@ -256,4 +265,4 @@ public abstract class RpcProgram extends ChannelInboundHandlerAdapter {
|
||||||
public int getPortmapUdpTimeoutMillis() {
|
public int getPortmapUdpTimeoutMillis() {
|
||||||
return portmapUdpTimeoutMillis;
|
return portmapUdpTimeoutMillis;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,15 +129,17 @@ public final class RpcUtil {
|
||||||
RpcInfo info = null;
|
RpcInfo info = null;
|
||||||
try {
|
try {
|
||||||
RpcCall callHeader = RpcCall.read(in);
|
RpcCall callHeader = RpcCall.read(in);
|
||||||
ByteBuf dataBuffer = Unpooled.wrappedBuffer(in.buffer()
|
ByteBuf dataBuffer = buf.slice(b.position(), b.remaining());
|
||||||
.slice());
|
|
||||||
|
|
||||||
info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
|
info = new RpcInfo(callHeader, dataBuffer, ctx, ctx.channel(),
|
||||||
remoteAddress);
|
remoteAddress);
|
||||||
} catch (Exception exc) {
|
} catch (Exception exc) {
|
||||||
LOG.info("Malformed RPC request from " + remoteAddress);
|
LOG.info("Malformed RPC request from " + remoteAddress);
|
||||||
} finally {
|
} finally {
|
||||||
buf.release();
|
// only release buffer if it is not passed to downstream handler
|
||||||
|
if (info == null) {
|
||||||
|
buf.release();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info != null) {
|
if (info != null) {
|
||||||
|
|
Loading…
Reference in New Issue