HBASE-22370 ByteBuf LEAK ERROR (#720)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
binlijin 2019-10-16 20:41:27 +08:00 committed by Duo Zhang
parent 17495d812d
commit 395cfceb0b
3 changed files with 31 additions and 0 deletions

View File

@ -550,6 +550,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
*/
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) {
buf.release();
buf = null;
}
datanodeList.forEach(ch -> ch.close());
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId);

View File

@ -88,6 +88,7 @@ public class CallRunner {
* Cleanup after ourselves... let go of references.
*/
private void cleanup() {
this.call.cleanup();
this.call = null;
this.rpcServer = null;
}

View File

@ -45,4 +45,30 @@ public class TestCallRunner {
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.run();
}
@Test
public void testCallCleanup() {
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
ServerCall mockCall = Mockito.mock(ServerCall.class);
Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.run();
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
}
@Test
public void testCallRunnerDrop() {
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
ServerCall mockCall = Mockito.mock(ServerCall.class);
Mockito.when(mockCall.disconnectSince()).thenReturn(1L);
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.drop();
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
}
}