HBASE-16335 RpcClient under heavy load leaks some netty bytebuf (Ram)

This commit is contained in:
Ramkrishna 2016-09-19 16:12:15 +05:30
parent 6eb6225456
commit c5b8aababe
5 changed files with 53 additions and 15 deletions

View File

@ -215,6 +215,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
LOG.info("Cleanup idle connection to " + conn.remoteId().address);
connections.removeValue(conn.remoteId(), conn);
conn.cleanupConnection();
}
}
}
@ -472,6 +473,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
conn.shutdown();
}
closeInternal();
for (T conn : connToClose) {
conn.cleanupConnection();
}
}
@Override

View File

@ -684,6 +684,11 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
closeConn(new IOException("connection to " + remoteId.address + " closed"));
}
@Override
public void cleanupConnection() {
// do nothing
}
@Override
public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
throws IOException {

View File

@ -36,6 +36,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ -119,6 +120,16 @@ class NettyRpcConnection extends RpcConnection {
shutdown0();
}
@Override
public synchronized void cleanupConnection() {
if (connectionHeaderPreamble != null) {
ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
}
if (connectionHeaderWithLength != null) {
ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
}
}
private void established(Channel ch) {
ch.write(connectionHeaderWithLength.retainedDuplicate());
ChannelPipeline p = ch.pipeline();

View File

@ -252,4 +252,9 @@ abstract class RpcConnection {
public abstract void shutdown();
public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException;
/**
* Does the clean up work after the connection is removed from the connection pool
*/
public abstract void cleanupConnection();
}

View File

@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import javax.security.sasl.SaslClient;
@ -60,21 +61,33 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
if (!queue.isEmpty()) {
ChannelPromise promise = ctx.newPromise();
int readableBytes = queue.readableBytes();
ByteBuf buf = queue.remove(readableBytes, promise);
byte[] bytes = new byte[readableBytes];
buf.readBytes(bytes);
byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
ChannelPromise lenPromise = ctx.newPromise();
ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
ChannelPromise contentPromise = ctx.newPromise();
ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
PromiseCombiner combiner = new PromiseCombiner();
combiner.addAll(lenPromise, contentPromise);
combiner.finish(promise);
ByteBuf buf = null;
try {
if (!queue.isEmpty()) {
ChannelPromise promise = ctx.newPromise();
int readableBytes = queue.readableBytes();
buf = queue.remove(readableBytes, promise);
byte[] bytes = new byte[readableBytes];
buf.readBytes(bytes);
byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
ChannelPromise lenPromise = ctx.newPromise();
ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
ChannelPromise contentPromise = ctx.newPromise();
ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
PromiseCombiner combiner = new PromiseCombiner();
combiner.addAll(lenPromise, contentPromise);
combiner.finish(promise);
}
ctx.flush();
} finally {
if (buf != null) {
ReferenceCountUtil.safeRelease(buf);
}
}
ctx.flush();
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
queue.releaseAndFailAll(new Throwable("Closed"));
}
}