diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 3f688d410be..cf29791fd4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -70,7 +70,8 @@ public class DFSStripedInputStream extends DFSInputStream { private final int groupSize; /** the buffer for a complete stripe. */ private ByteBuffer curStripeBuf; - private ByteBuffer parityBuf; + @VisibleForTesting + protected ByteBuffer parityBuf; private final ErasureCodingPolicy ecPolicy; private RawErasureDecoder decoder; @@ -129,7 +130,7 @@ public class DFSStripedInputStream extends DFSInputStream { curStripeRange = new StripeRange(0, 0); } - protected ByteBuffer getParityBuffer() { + protected synchronized ByteBuffer getParityBuffer() { if (parityBuf == null) { parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize * parityBlkNum); @@ -554,4 +555,17 @@ public class DFSStripedInputStream extends DFSInputStream { throw new UnsupportedOperationException( "Not support enhanced byte buffer access."); } + + @Override + public synchronized void unbuffer() { + super.unbuffer(); + if (curStripeBuf != null) { + BUFFER_POOL.putBuffer(curStripeBuf); + curStripeBuf = null; + } + if (parityBuf != null) { + BUFFER_POOL.putBuffer(parityBuf); + parityBuf = null; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 33556919b69..7973cfa1ec5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -627,4 +627,41 @@ public class TestDFSStripedInputStream { } } } + + @Test + public void testUnbuffer() throws Exception { + final int numBlocks = 2; + final int fileSize = numBlocks * blockGroupSize; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + stripesPerBlock, false, ecPolicy); + LocatedBlocks lbs = fs.getClient().namenode. + getBlockLocations(filePath.toString(), 0, fileSize); + + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lb); + for (int i = 0; i < dataBlocks; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + stripesPerBlock * cellSize, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + } + DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, ecPolicy, null); + ByteBuffer readBuffer = ByteBuffer.allocate(fileSize); + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer); + assertTrue(ret > 0); + done += ret; + } + in.unbuffer(); + ByteBuffer curStripeBuf = (in.getCurStripeBuf()); + assertNull(curStripeBuf); + assertNull(in.parityBuf); + in.close(); + } }