diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index ea9a0d8920a..a805659fd64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -550,6 +550,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { */ @Override public void recoverAndClose(CancelableProgressable reporter) throws IOException { + if (buf != null) { + buf.release(); + buf = null; + } datanodeList.forEach(ch -> ch.close()); datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); endFileLease(client, fileId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 0ad3613af2f..48ee6646939 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -88,6 +88,7 @@ public class CallRunner { * Cleanup after ourselves... let go of references. */ private void cleanup() { + this.call.cleanup(); this.call = null; this.rpcServer = null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java index e614c2b9cd4..75997afde09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java @@ -45,4 +45,30 @@ public class TestCallRunner { cr.setStatus(new MonitoredRPCHandlerImpl()); cr.run(); } + + @Test + public void testCallCleanup() { + RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); + Mockito.when(mockRpcServer.isStarted()).thenReturn(true); + ServerCall mockCall = Mockito.mock(ServerCall.class); + Mockito.when(mockCall.disconnectSince()).thenReturn(1L); + + CallRunner cr = new CallRunner(mockRpcServer, mockCall); + cr.setStatus(new MonitoredRPCHandlerImpl()); + cr.run(); + Mockito.verify(mockCall, Mockito.times(1)).cleanup(); + } + + @Test + public void testCallRunnerDrop() { + RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); + Mockito.when(mockRpcServer.isStarted()).thenReturn(true); + ServerCall mockCall = Mockito.mock(ServerCall.class); + Mockito.when(mockCall.disconnectSince()).thenReturn(1L); + + CallRunner cr = new CallRunner(mockRpcServer, mockCall); + cr.setStatus(new MonitoredRPCHandlerImpl()); + cr.drop(); + Mockito.verify(mockCall, Mockito.times(1)).cleanup(); + } }