HDFS-13540. DFSStripedInputStream should only allocate new buffers when reading. Contributed by Xiao Chen.

(cherry picked from commit 34e8b9f9a8)
(cherry picked from commit 7d71b3a1cc)
This commit is contained in:
Sammi Chen 2018-05-23 19:10:09 +08:00
parent bd98d4e77c
commit 6db710b9d8
3 changed files with 64 additions and 5 deletions

View File

@ -116,4 +116,16 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
// poor granularity.
}
}
/**
* Get the size of the buffer pool, for the specified buffer type.
*
* @param direct Whether the size is returned for direct buffers
* @return The size
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public int size(boolean direct) {
return getBufferTree(direct).size();
}
}

View File

@ -114,12 +114,14 @@ public class DFSStripedInputStream extends DFSInputStream {
return decoder.preferDirectBuffer();
}
void resetCurStripeBuffer() {
if (curStripeBuf == null) {
private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
if (shouldAllocateBuf && curStripeBuf == null) {
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * dataBlkNum);
}
curStripeBuf.clear();
if (curStripeBuf != null) {
curStripeBuf.clear();
}
curStripeRange = new StripeRange(0, 0);
}
@ -204,7 +206,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected void closeCurrentBlockReaders() {
resetCurStripeBuffer();
resetCurStripeBuffer(false);
if (blockReaders == null || blockReaders.length == 0) {
return;
}
@ -294,7 +296,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
private void readOneStripe(CorruptedBlocks corruptedBlocks)
throws IOException {
resetCurStripeBuffer();
resetCurStripeBuffer(true);
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@ -529,4 +530,48 @@ public class TestDFSStripedInputStream {
}
}
}
@Test
public void testCloseDoesNotAllocateNewBuffer() throws Exception {
final int numBlocks = 2;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
stripesPerBlock, false, ecPolicy);
try (DFSInputStream in = fs.getClient().open(filePath.toString())) {
assertTrue(in instanceof DFSStripedInputStream);
final DFSStripedInputStream stream = (DFSStripedInputStream) in;
final ElasticByteBufferPool ebbp =
(ElasticByteBufferPool) stream.getBufferPool();
// first clear existing pool
LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: "
+ ebbp.size(false));
emptyBufferPoolForCurrentPolicy(ebbp, true);
emptyBufferPoolForCurrentPolicy(ebbp, false);
final int startSizeDirect = ebbp.size(true);
final int startSizeIndirect = ebbp.size(false);
// close should not allocate new buffers in the pool.
stream.close();
assertEquals(startSizeDirect, ebbp.size(true));
assertEquals(startSizeIndirect, ebbp.size(false));
}
}
/**
* Empties the pool for the specified buffer type, for the current ecPolicy.
* <p>
* Note that {@link #ecPolicy} may change for difference test cases in
* {@link TestDFSStripedInputStreamWithRandomECPolicy}.
*/
private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
boolean direct) {
int size;
while ((size = ebbp.size(direct)) != 0) {
ebbp.getBuffer(direct,
ecPolicy.getCellSize() * ecPolicy.getNumDataUnits());
if (size == ebbp.size(direct)) {
// if getBuffer didn't decrease size, it means the pool for the buffer
// corresponding to current ecPolicy is empty
break;
}
}
}
}