From c5b8aababe18f65f5db979128a62d8a0686b9dc5 Mon Sep 17 00:00:00 2001 From: Ramkrishna Date: Mon, 19 Sep 2016 16:12:15 +0530 Subject: [PATCH] HBASE-16335 RpcClient under heavy load leaks some netty bytebuf (Ram) --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 4 ++ .../hbase/ipc/BlockingRpcConnection.java | 5 +++ .../hadoop/hbase/ipc/NettyRpcConnection.java | 11 +++++ .../hadoop/hbase/ipc/RpcConnection.java | 5 +++ .../hbase/security/SaslWrapHandler.java | 43 ++++++++++++------- 5 files changed, 53 insertions(+), 15 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 401a240619a..990ffe09044 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -215,6 +215,7 @@ public abstract class AbstractRpcClient implements RpcC if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { LOG.info("Cleanup idle connection to " + conn.remoteId().address); connections.removeValue(conn.remoteId(), conn); + conn.cleanupConnection(); } } } @@ -472,6 +473,9 @@ public abstract class AbstractRpcClient implements RpcC conn.shutdown(); } closeInternal(); + for (T conn : connToClose) { + conn.cleanupConnection(); + } } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index c8b366d0ec0..528b726c869 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -684,6 +684,11 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { closeConn(new IOException("connection to " + remoteId.address + " closed")); } + @Override + public void cleanupConnection() { + // do nothing + } + @Override public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) throws IOException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 5f22dfd853a..559b7f9464e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -36,6 +36,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; @@ -119,6 +120,16 @@ class NettyRpcConnection extends RpcConnection { shutdown0(); } + @Override + public synchronized void cleanupConnection() { + if (connectionHeaderPreamble != null) { + ReferenceCountUtil.safeRelease(connectionHeaderPreamble); + } + if (connectionHeaderWithLength != null) { + ReferenceCountUtil.safeRelease(connectionHeaderWithLength); + } + } + private void established(Channel ch) { ch.write(connectionHeaderWithLength.retainedDuplicate()); ChannelPipeline p = ch.pipeline(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 8118b204c05..5e9e97e219d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -252,4 +252,9 @@ abstract class RpcConnection { public abstract void shutdown(); public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException; + + /** + * Does the clean up work after the connection is removed from the connection pool + */ + public abstract void cleanupConnection(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java index ddb4ae9ab85..fefb4f8d6ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.channel.CoalescingBufferQueue; +import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.PromiseCombiner; import javax.security.sasl.SaslClient; @@ -60,21 +61,33 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { @Override public void flush(ChannelHandlerContext ctx) throws Exception { - if (!queue.isEmpty()) { - ChannelPromise promise = ctx.newPromise(); - int readableBytes = queue.readableBytes(); - ByteBuf buf = queue.remove(readableBytes, promise); - byte[] bytes = new byte[readableBytes]; - buf.readBytes(bytes); - byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); - ChannelPromise lenPromise = ctx.newPromise(); - ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); - ChannelPromise contentPromise = ctx.newPromise(); - ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); - PromiseCombiner combiner = new PromiseCombiner(); - combiner.addAll(lenPromise, contentPromise); - combiner.finish(promise); + ByteBuf buf = null; + try { + if (!queue.isEmpty()) { + ChannelPromise promise = ctx.newPromise(); + int readableBytes = queue.readableBytes(); + buf = queue.remove(readableBytes, promise); + byte[] bytes = new byte[readableBytes]; + buf.readBytes(bytes); + byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); + ChannelPromise lenPromise = ctx.newPromise(); + ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); + ChannelPromise contentPromise = ctx.newPromise(); + ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); + PromiseCombiner combiner = new PromiseCombiner(); + combiner.addAll(lenPromise, contentPromise); + combiner.finish(promise); + } + ctx.flush(); + } finally { + if (buf != null) { + ReferenceCountUtil.safeRelease(buf); + } } - ctx.flush(); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + queue.releaseAndFailAll(new Throwable("Closed")); } }