HBASE-27180 Fix multiple possible buffer leaks (#4597)
* Fix multiple possible buffer leaks
Motivation:
When using ByteBuf you need to be very careful about releasing it as otherwise you might leak data. There were various places in the code-base where such a leak could happen.
Modifications:
- Fix possible buffer leaks
- Ensure we call touch(...) so its easier to debug buffer leaks
Result:
Fix buffer leaks
* Formatting
* Revert some changes as requested
* revert touch
* Also release checksum and header buffers
Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit 2197b3806b
)
This commit is contained in:
parent
decb96edd2
commit
ff710b8252
|
@ -412,6 +412,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
future.completeExceptionally(new IOException("stream already broken"));
|
future.completeExceptionally(new IOException("stream already broken"));
|
||||||
// it's the one we have just pushed or just a no-op
|
// it's the one we have just pushed or just a no-op
|
||||||
waitingAckQueue.removeFirst();
|
waitingAckQueue.removeFirst();
|
||||||
|
|
||||||
|
checksumBuf.release();
|
||||||
|
headerBuf.release();
|
||||||
|
|
||||||
|
// This method takes ownership of the dataBuf so we need release it before returning.
|
||||||
|
dataBuf.release();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
datanodeList.forEach(ch -> {
|
datanodeList.forEach(ch -> {
|
||||||
|
|
|
@ -662,7 +662,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// Release buffer on removal.
|
||||||
cBuf.release();
|
cBuf.release();
|
||||||
cBuf = null;
|
cBuf = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,6 +164,8 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends AsyncFSTestBase {
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
if (!(msg instanceof ByteBuf)) {
|
if (!(msg instanceof ByteBuf)) {
|
||||||
ctx.fireChannelRead(msg);
|
ctx.fireChannelRead(msg);
|
||||||
|
} else {
|
||||||
|
((ByteBuf) msg).release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue