diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 673fbab6abb..f087bb48351 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -382,3 +382,6 @@ HDFS-8202. Improve end to end stirpping file test to add erasure recovering test. (Xinwei Qin via zhz) + + HDFS-8804. Erasure Coding: use DirectBufferPool in DFSStripedInputStream for + buffer allocation. (jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 1f64d4ed4af..36120635b21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -39,6 +39,7 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.util.DirectBufferPool; import java.io.EOFException; import java.io.IOException; @@ -136,6 +137,8 @@ public class DFSStripedInputStream extends DFSInputStream { } } + private static final DirectBufferPool bufferPool = new DirectBufferPool(); + private final BlockReaderInfo[] blockReaders; private final int cellSize; private final short dataBlkNum; @@ -143,6 +146,7 @@ public class DFSStripedInputStream extends DFSInputStream { private final int groupSize; /** the buffer for a complete stripe */ private ByteBuffer curStripeBuf; + private ByteBuffer parityBuf; private final ECSchema schema; private final RawErasureDecoder decoder; @@ -177,12 +181,20 @@ public class DFSStripedInputStream extends DFSInputStream { private void resetCurStripeBuffer() { if (curStripeBuf == null) { - curStripeBuf = ByteBuffer.allocateDirect(cellSize * dataBlkNum); + curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum); } curStripeBuf.clear(); curStripeRange = new StripeRange(0, 0); } + private ByteBuffer getParityBuffer() { + if (parityBuf == null) { + parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum); + } + parityBuf.clear(); + return parityBuf; + } + /** * When seeking into a new block group, create blockReader for each internal * block in the group. @@ -204,6 +216,19 @@ public class DFSStripedInputStream extends DFSInputStream { currentLocatedBlock = targetBlockGroup; } + @Override + public synchronized void close() throws IOException { + super.close(); + if (curStripeBuf != null) { + bufferPool.returnBuffer(curStripeBuf); + curStripeBuf = null; + } + if (parityBuf != null) { + bufferPool.returnBuffer(parityBuf); + parityBuf = null; + } + } + /** * Extend the super method with the logic of switching between cells. * When reaching the end of a cell, proceed to the next cell and read it @@ -830,8 +855,10 @@ public class DFSStripedInputStream extends DFSInputStream { } final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, dataBlkNum, parityBlkNum); - decodeInputs[decodeIndex] = ByteBuffer.allocateDirect( - (int) alignedStripe.range.spanInBlock); + ByteBuffer buf = getParityBuffer().duplicate(); + buf.position(cellSize * decodeIndex); + buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock); + decodeInputs[decodeIndex] = buf.slice(); alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); return true; }