HDFS-14308. DFSStripedInputStream curStripeBuf is not freed by unbuffer() (#1667)
Reviewed-by: Aravindan Vijayan <avijayan@cloudera.com>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 30db895b59
)
This commit is contained in:
parent
fa6b27ea8d
commit
9316ca149f
|
@ -70,7 +70,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
private final int groupSize;
|
||||
/** the buffer for a complete stripe. */
|
||||
private ByteBuffer curStripeBuf;
|
||||
private ByteBuffer parityBuf;
|
||||
@VisibleForTesting
|
||||
protected ByteBuffer parityBuf;
|
||||
private final ErasureCodingPolicy ecPolicy;
|
||||
private RawErasureDecoder decoder;
|
||||
|
||||
|
@ -129,7 +130,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
curStripeRange = new StripeRange(0, 0);
|
||||
}
|
||||
|
||||
protected ByteBuffer getParityBuffer() {
|
||||
protected synchronized ByteBuffer getParityBuffer() {
|
||||
if (parityBuf == null) {
|
||||
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
||||
cellSize * parityBlkNum);
|
||||
|
@ -554,4 +555,17 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
throw new UnsupportedOperationException(
|
||||
"Not support enhanced byte buffer access.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void unbuffer() {
|
||||
super.unbuffer();
|
||||
if (curStripeBuf != null) {
|
||||
BUFFER_POOL.putBuffer(curStripeBuf);
|
||||
curStripeBuf = null;
|
||||
}
|
||||
if (parityBuf != null) {
|
||||
BUFFER_POOL.putBuffer(parityBuf);
|
||||
parityBuf = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -627,4 +627,41 @@ public class TestDFSStripedInputStream {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnbuffer() throws Exception {
|
||||
final int numBlocks = 2;
|
||||
final int fileSize = numBlocks * blockGroupSize;
|
||||
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||
stripesPerBlock, false, ecPolicy);
|
||||
LocatedBlocks lbs = fs.getClient().namenode.
|
||||
getBlockLocations(filePath.toString(), 0, fileSize);
|
||||
|
||||
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
||||
assert lb instanceof LocatedStripedBlock;
|
||||
LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
|
||||
for (int i = 0; i < dataBlocks; i++) {
|
||||
Block blk = new Block(bg.getBlock().getBlockId() + i,
|
||||
stripesPerBlock * cellSize,
|
||||
bg.getBlock().getGenerationStamp());
|
||||
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
|
||||
cluster.injectBlocks(i, Arrays.asList(blk),
|
||||
bg.getBlock().getBlockPoolId());
|
||||
}
|
||||
}
|
||||
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||
filePath.toString(), false, ecPolicy, null);
|
||||
ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
|
||||
int done = 0;
|
||||
while (done < fileSize) {
|
||||
int ret = in.read(readBuffer);
|
||||
assertTrue(ret > 0);
|
||||
done += ret;
|
||||
}
|
||||
in.unbuffer();
|
||||
ByteBuffer curStripeBuf = (in.getCurStripeBuf());
|
||||
assertNull(curStripeBuf);
|
||||
assertNull(in.parityBuf);
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue