diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java index 023f37f9b41..9dd7771fd57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -116,4 +116,16 @@ public final class ElasticByteBufferPool implements ByteBufferPool { // poor granularity. } } + + /** + * Get the size of the buffer pool, for the specified buffer type. + * + * @param direct Whether the size is returned for direct buffers + * @return The size + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public int size(boolean direct) { + return getBufferTree(direct).size(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index f3b16e09812..5557a502cef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -116,12 +116,14 @@ public class DFSStripedInputStream extends DFSInputStream { return decoder.preferDirectBuffer(); } - void resetCurStripeBuffer() { - if (curStripeBuf == null) { + private void resetCurStripeBuffer(boolean shouldAllocateBuf) { + if (shouldAllocateBuf && curStripeBuf == null) { curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(), cellSize * dataBlkNum); } - curStripeBuf.clear(); + if (curStripeBuf != null) { + curStripeBuf.clear(); + } curStripeRange = new StripeRange(0, 0); } @@ -206,7 +208,7 @@ public class DFSStripedInputStream extends DFSInputStream { */ @Override protected void closeCurrentBlockReaders() { - resetCurStripeBuffer(); + resetCurStripeBuffer(false); if (blockReaders == null || blockReaders.length == 0) { return; } @@ -296,7 +298,7 @@ public class DFSStripedInputStream extends DFSInputStream { */ private void readOneStripe(CorruptedBlocks corruptedBlocks) throws IOException { - resetCurStripeBuffer(); + resetCurStripeBuffer(true); // compute stripe range based on pos final long offsetInBlockGroup = getOffsetInBlockGroup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index cdebee0dc8d..422746e33c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ErasureCodeNative; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; @@ -529,4 +530,48 @@ public class TestDFSStripedInputStream { } } } + + @Test + public void testCloseDoesNotAllocateNewBuffer() throws Exception { + final int numBlocks = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + stripesPerBlock, false, ecPolicy); + try (DFSInputStream in = fs.getClient().open(filePath.toString())) { + assertTrue(in instanceof DFSStripedInputStream); + final DFSStripedInputStream stream = (DFSStripedInputStream) in; + final ElasticByteBufferPool ebbp = + (ElasticByteBufferPool) stream.getBufferPool(); + // first clear existing pool + LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: " + + ebbp.size(false)); + emptyBufferPoolForCurrentPolicy(ebbp, true); + emptyBufferPoolForCurrentPolicy(ebbp, false); + final int startSizeDirect = ebbp.size(true); + final int startSizeIndirect = ebbp.size(false); + // close should not allocate new buffers in the pool. + stream.close(); + assertEquals(startSizeDirect, ebbp.size(true)); + assertEquals(startSizeIndirect, ebbp.size(false)); + } + } + + /** + * Empties the pool for the specified buffer type, for the current ecPolicy. + *
+ * Note that {@link #ecPolicy} may change for difference test cases in + * {@link TestDFSStripedInputStreamWithRandomECPolicy}. + */ + private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp, + boolean direct) { + int size; + while ((size = ebbp.size(direct)) != 0) { + ebbp.getBuffer(direct, + ecPolicy.getCellSize() * ecPolicy.getNumDataUnits()); + if (size == ebbp.size(direct)) { + // if getBuffer didn't decrease size, it means the pool for the buffer + // corresponding to current ecPolicy is empty + break; + } + } + } }