HBASE-27180 Fix multiple possible buffer leaks (#4597)
* Fix multiple possible buffer leaks Motivation: When using ByteBuf you need to be very careful about releasing it as otherwise you might leak data. There were various places in the code-base where such a leak could happen. Modifications: - Fix possible buffer leaks - Ensure we call touch(...) so its easier to debug buffer leaks Result: Fix buffer leaks * Formatting * Revert some changes as requested * revert touch * Also release checksum and header buffers Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
22618dadfe
commit
2197b3806b
|
@ -429,6 +429,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
future.completeExceptionally(new IOException("stream already broken"));
|
future.completeExceptionally(new IOException("stream already broken"));
|
||||||
// it's the one we have just pushed or just a no-op
|
// it's the one we have just pushed or just a no-op
|
||||||
waitingAckQueue.removeFirst();
|
waitingAckQueue.removeFirst();
|
||||||
|
|
||||||
|
checksumBuf.release();
|
||||||
|
headerBuf.release();
|
||||||
|
|
||||||
|
// This method takes ownership of the dataBuf so we need release it before returning.
|
||||||
|
dataBuf.release();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// TODO: we should perhaps measure time taken per DN here;
|
// TODO: we should perhaps measure time taken per DN here;
|
||||||
|
|
|
@ -662,7 +662,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// Release buffer on removal.
|
||||||
cBuf.release();
|
cBuf.release();
|
||||||
cBuf = null;
|
cBuf = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,6 +185,8 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
if (!(msg instanceof ByteBuf)) {
|
if (!(msg instanceof ByteBuf)) {
|
||||||
ctx.fireChannelRead(msg);
|
ctx.fireChannelRead(msg);
|
||||||
|
} else {
|
||||||
|
((ByteBuf) msg).release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue