From b9093630380d316e31220a6c19d16cbfb89d1d65 Mon Sep 17 00:00:00 2001 From: binlijin Date: Wed, 16 Oct 2019 20:41:27 +0800 Subject: [PATCH] HBASE-22370 ByteBuf LEAK ERROR (#720) Signed-off-by: Duo Zhang --- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 4 +++ .../apache/hadoop/hbase/ipc/CallRunner.java | 1 + .../hadoop/hbase/ipc/TestCallRunner.java | 26 +++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 1645d68be67..7a429c2a4c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -549,6 +549,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { */ @Override public void recoverAndClose(CancelableProgressable reporter) throws IOException { + if (buf != null) { + buf.release(); + buf = null; + } datanodeList.forEach(ch -> ch.close()); datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); endFileLease(client, fileId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 0ad3613af2f..48ee6646939 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -88,6 +88,7 @@ public class CallRunner { * Cleanup after ourselves... let go of references. */ private void cleanup() { + this.call.cleanup(); this.call = null; this.rpcServer = null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java index e614c2b9cd4..75997afde09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java @@ -45,4 +45,30 @@ public class TestCallRunner { cr.setStatus(new MonitoredRPCHandlerImpl()); cr.run(); } + + @Test + public void testCallCleanup() { + RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); + Mockito.when(mockRpcServer.isStarted()).thenReturn(true); + ServerCall mockCall = Mockito.mock(ServerCall.class); + Mockito.when(mockCall.disconnectSince()).thenReturn(1L); + + CallRunner cr = new CallRunner(mockRpcServer, mockCall); + cr.setStatus(new MonitoredRPCHandlerImpl()); + cr.run(); + Mockito.verify(mockCall, Mockito.times(1)).cleanup(); + } + + @Test + public void testCallRunnerDrop() { + RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); + Mockito.when(mockRpcServer.isStarted()).thenReturn(true); + ServerCall mockCall = Mockito.mock(ServerCall.class); + Mockito.when(mockCall.disconnectSince()).thenReturn(1L); + + CallRunner cr = new CallRunner(mockRpcServer, mockCall); + cr.setStatus(new MonitoredRPCHandlerImpl()); + cr.drop(); + Mockito.verify(mockCall, Mockito.times(1)).cleanup(); + } }