HDFS-8804. Erasure Coding: use DirectBufferPool in DFSStripedInputStream for buffer allocation. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-08-03 17:03:15 -07:00
parent ba90c02853
commit 9312b168e2
2 changed files with 33 additions and 3 deletions

View File

@ -382,3 +382,6 @@
HDFS-8202. Improve end to end stirpping file test to add erasure recovering
test. (Xinwei Qin via zhz)
HDFS-8804. Erasure Coding: use DirectBufferPool in DFSStripedInputStream for
buffer allocation. (jing9)

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.util.DirectBufferPool;
import java.io.EOFException;
import java.io.IOException;
@ -136,6 +137,8 @@ public class DFSStripedInputStream extends DFSInputStream {
}
}
private static final DirectBufferPool bufferPool = new DirectBufferPool();
private final BlockReaderInfo[] blockReaders;
private final int cellSize;
private final short dataBlkNum;
@ -143,6 +146,7 @@ public class DFSStripedInputStream extends DFSInputStream {
private final int groupSize;
/** the buffer for a complete stripe */
private ByteBuffer curStripeBuf;
private ByteBuffer parityBuf;
private final ECSchema schema;
private final RawErasureDecoder decoder;
@ -177,12 +181,20 @@ public class DFSStripedInputStream extends DFSInputStream {
private void resetCurStripeBuffer() {
if (curStripeBuf == null) {
curStripeBuf = ByteBuffer.allocateDirect(cellSize * dataBlkNum);
curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum);
}
curStripeBuf.clear();
curStripeRange = new StripeRange(0, 0);
}
private ByteBuffer getParityBuffer() {
if (parityBuf == null) {
parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum);
}
parityBuf.clear();
return parityBuf;
}
/**
* When seeking into a new block group, create blockReader for each internal
* block in the group.
@ -204,6 +216,19 @@ public class DFSStripedInputStream extends DFSInputStream {
currentLocatedBlock = targetBlockGroup;
}
@Override
public synchronized void close() throws IOException {
super.close();
if (curStripeBuf != null) {
bufferPool.returnBuffer(curStripeBuf);
curStripeBuf = null;
}
if (parityBuf != null) {
bufferPool.returnBuffer(parityBuf);
parityBuf = null;
}
}
/**
* Extend the super method with the logic of switching between cells.
* When reaching the end of a cell, proceed to the next cell and read it
@ -830,8 +855,10 @@ public class DFSStripedInputStream extends DFSInputStream {
}
final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
dataBlkNum, parityBlkNum);
decodeInputs[decodeIndex] = ByteBuffer.allocateDirect(
(int) alignedStripe.range.spanInBlock);
ByteBuffer buf = getParityBuffer().duplicate();
buf.position(cellSize * decodeIndex);
buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock);
decodeInputs[decodeIndex] = buf.slice();
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
return true;
}