HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-05-04 14:44:58 -07:00 committed by Zhe Zhang
parent ea6c66ed57
commit 6dea01f1ee
6 changed files with 246 additions and 88 deletions

View File

@ -161,3 +161,6 @@
HDFS-8316. Erasure coding: refactor EC constants to be consistent with HDFS-8249.
(Zhe Zhang via jing9)
HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout.
(jing9)

View File

@ -717,6 +717,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
interface ReaderStrategy {
public int doRead(BlockReader blockReader, int off, int len)
throws ChecksumException, IOException;
/**
* Copy data from the src ByteBuffer into the read buffer.
* @param src The src buffer where the data is copied from
* @param offset Useful only when the ReadStrategy is based on a byte array.
* Indicate the offset of the byte array for copy.
* @param length Useful only when the ReadStrategy is based on a byte array.
* Indicate the length of the data to copy.
*/
public int copyFrom(ByteBuffer src, int offset, int length);
}
protected void updateReadStatistics(ReadStatistics readStatistics,
@ -750,6 +760,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
updateReadStatistics(readStatistics, nRead, blockReader);
return nRead;
}
@Override
public int copyFrom(ByteBuffer src, int offset, int length) {
ByteBuffer writeSlice = src.duplicate();
writeSlice.get(buf, offset, length);
return length;
}
}
/**
@ -783,6 +800,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
}
}
@Override
public int copyFrom(ByteBuffer src, int offset, int length) {
ByteBuffer writeSlice = src.duplicate();
int remaining = Math.min(buf.remaining(), writeSlice.remaining());
writeSlice.limit(writeSlice.position() + remaining);
buf.put(writeSlice);
return remaining;
}
}
/* This is a used by regular read() and handles ChecksumExceptions.

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
@ -37,6 +38,7 @@ import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CancellationException;
@ -62,7 +64,7 @@ import java.util.concurrent.Future;
* +------+ <- A cell contains {@link #cellSize} bytes of data
*
* Three styles of read will eventually be supported:
* 1. Stateful read: TODO: HDFS-8033
* 1. Stateful read
* 2. pread without decode support
* This is implemented by calculating the portion of read from each block and
* issuing requests to each DataNode in parallel.
@ -91,12 +93,38 @@ public class DFSStripedInputStream extends DFSInputStream {
}
}
/** Used to indicate the buffered data's range in the block group */
private static class StripeRange {
/** start offset in the block group (inclusive) */
final long offsetInBlock;
/** length of the stripe range */
final long length;
StripeRange(long offsetInBlock, long length) {
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
this.offsetInBlock = offsetInBlock;
this.length = length;
}
boolean include(long pos) {
return pos >= offsetInBlock && pos < offsetInBlock + length;
}
}
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
private final BlockReader[] blockReaders = new BlockReader[groupSize];
private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize];
private final int cellSize;
private final short dataBlkNum;
private final short parityBlkNum;
/** the buffer for a complete stripe */
private ByteBuffer curStripeBuf;
/**
* indicate the start/end offset of the current buffered stripe in the
* block group
*/
private StripeRange curStripeRange;
private final CompletionService<Integer> readingService;
DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
ECInfo ecInfo) throws IOException {
@ -106,7 +134,20 @@ public class DFSStripedInputStream extends DFSInputStream {
cellSize = ecInfo.getSchema().getChunkSize();
dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits();
parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits();
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
curStripeRange = new StripeRange(0, 0);
readingService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
}
private void resetCurStripeBuffer() {
if (curStripeBuf == null) {
curStripeBuf = ByteBuffer.allocateDirect(cellSize * dataBlkNum);
}
curStripeBuf.clear();
curStripeRange = new StripeRange(0, 0);
}
@Override
@ -141,7 +182,7 @@ public class DFSStripedInputStream extends DFSInputStream {
targetBlockGroup.getBlockSize() - 1;
currentLocatedBlock = targetBlockGroup;
long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
final long offsetIntoBlockGroup = getOffsetInBlockGroup();
LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
// The purpose is to get start offset into each block
@ -156,8 +197,8 @@ public class DFSStripedInputStream extends DFSInputStream {
if (retval != null) {
currentNodes[i] = retval.info;
blockReaders[i] = getBlockReaderWithRetry(targetBlock,
readPortions[i].startOffsetInBlock,
targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock,
readPortions[i].getStartOffsetInBlock(),
targetBlock.getBlockSize() - readPortions[i].getStartOffsetInBlock(),
retval.addr, retval.storageType, retval.info, target, retry);
}
}
@ -203,6 +244,7 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected void closeCurrentBlockReaders() {
resetCurStripeBuffer();
if (blockReaders == null || blockReaders.length == 0) {
return;
}
@ -220,6 +262,73 @@ public class DFSStripedInputStream extends DFSInputStream {
blockEnd = -1;
}
private long getOffsetInBlockGroup() {
return pos - currentLocatedBlock.getStartOffset();
}
/**
* Read a new stripe covering the current position, and store the data in the
* {@link #curStripeBuf}.
*/
private void readOneStripe(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
resetCurStripeBuffer();
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();
final long stripeLen = cellSize * dataBlkNum;
int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
curStripeRange = new StripeRange(stripeIndex * stripeLen,
Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen),
stripeLen));
final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1);
// read the whole stripe in parallel
Map<Future<Integer>, Integer> futures = new HashMap<>();
for (int i = 0; i < numCell; i++) {
curStripeBuf.position(cellSize * i);
curStripeBuf.limit((int) Math.min(cellSize * (i + 1),
curStripeRange.length));
ByteBuffer buf = curStripeBuf.slice();
ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
final int targetLength = buf.remaining();
Callable<Integer> readCallable = readCell(blockReaders[i],
currentNodes[i], strategy, targetLength, corruptedBlockMap);
Future<Integer> request = readingService.submit(readCallable);
futures.put(request, i);
}
while (!futures.isEmpty()) {
try {
waitNextCompletion(readingService, futures);
// TODO: decode and record bad reader if necessary
} catch (InterruptedException ignored) {
// ignore and retry
}
}
}
private Callable<Integer> readCell(final BlockReader reader,
final DatanodeInfo datanode, final ByteBufferStrategy strategy,
final int targetLength,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
while (result < targetLength) {
int ret = readBuffer(reader, datanode, strategy, corruptedBlockMap);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
}
result += ret;
}
updateReadStatistics(readStatistics, targetLength, reader);
return result;
}
};
}
@Override
protected synchronized int readWithStrategy(ReaderStrategy strategy,
int off, int len) throws IOException {
@ -227,11 +336,10 @@ public class DFSStripedInputStream extends DFSInputStream {
if (closed.get()) {
throw new IOException("Stream closed");
}
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
new ConcurrentHashMap<>();
failures = 0;
if (pos < getFileLength()) {
/** Index of the target block in a stripe to read from */
int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
try {
if (pos > blockEnd) {
blockSeekTo(pos);
@ -247,40 +355,13 @@ public class DFSStripedInputStream extends DFSInputStream {
/** Number of bytes already read into buffer */
int result = 0;
while (result < realLen) {
/**
* Temporary position into the file; {@link pos} might not proceed
* to this temporary position in case of exceptions.
*/
long tmpPos = pos + result;
/** Start and end offsets of a cell in the file */
long cellStart = (tmpPos / cellSize) * cellSize;
long cellEnd = cellStart + cellSize - 1;
/** Number of bytes to read from the current cell */
int realLenInCell = (int) Math.min(realLen - result,
cellEnd - tmpPos + 1L);
assert realLenInCell > 0 : "Temporary position shouldn't be "
+ "after cellEnd";
// Read from one blockReader up to cell boundary
int cellRet = readBuffer(blockReaders[idxInGroup],
currentNodes[idxInGroup], strategy, off + result, realLenInCell,
corruptedBlockMap);
if (cellRet >= 0) {
result += cellRet;
if (cellRet < realLenInCell) {
// A short read indicates the current blockReader buffer is
// already drained. Should return the read call. Otherwise
// should proceed to the next cell.
break;
}
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
if (!curStripeRange.include(getOffsetInBlockGroup())) {
readOneStripe(corruptedBlockMap);
}
idxInGroup = (idxInGroup + 1) % dataBlkNum;
int ret = copy(strategy, off + result, realLen - result);
result += ret;
pos += ret;
}
pos += result;
if (dfsClient.stats != null) {
dfsClient.stats.incrementBytesRead(result);
}
@ -295,11 +376,11 @@ public class DFSStripedInputStream extends DFSInputStream {
return -1;
}
private synchronized int readBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len,
private int readBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ByteBufferStrategy readerStrategy,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
try {
return readerStrategy.doRead(blockReader, off, len);
return readerStrategy.doRead(blockReader, 0, 0);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
@ -312,26 +393,25 @@ public class DFSStripedInputStream extends DFSInputStream {
+ getCurrentBlock() + " of " + src + " from "
+ currentNode, e);
}
// TODO: this should trigger decoding logic (HDFS-7678)
return -1;
}
protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
ByteBufferStrategy(ByteBuffer buf) {
super(buf);
}
@Override
public int doRead(BlockReader blockReader, int off, int len)
throws IOException {
int oldlimit = buf.limit();
if (buf.remaining() > len) {
buf.limit(buf.position() + len);
}
int ret = super.doRead(blockReader, off, len);
buf.limit(oldlimit);
return ret;
}
/**
* Copy the data from {@link #curStripeBuf} into the given buffer
* @param strategy the ReaderStrategy containing the given buffer
* @param offset the offset of the given buffer. Used only when strategy is
* a ByteArrayStrategy
* @param length target length
* @return number of bytes copied
*/
private int copy(ReaderStrategy strategy, int offset, int length) {
final long stripeLen = cellSize * dataBlkNum;
final long offsetInBlk = pos - currentLocatedBlock.getStartOffset();
// compute the position in the curStripeBuf based on "pos"
int bufOffset = (int) (offsetInBlk % stripeLen);
curStripeBuf.position(bufOffset);
return strategy.copyFrom(curStripeBuf, offset,
Math.min(length, curStripeBuf.remaining()));
}
/**
@ -366,8 +446,7 @@ public class DFSStripedInputStream extends DFSInputStream {
DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+ blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
}
return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize,
dataBlkNum, idx);
return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, dataBlkNum, idx);
}
private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
@ -404,7 +483,7 @@ public class DFSStripedInputStream extends DFSInputStream {
for (short i = 0; i < dataBlkNum; i++) {
ReadPortion rp = readPortions[i];
if (rp.readLength <= 0) {
if (rp.getReadLength() <= 0) {
continue;
}
DatanodeInfo loc = blks[i].getLocations()[0];
@ -413,8 +492,8 @@ public class DFSStripedInputStream extends DFSInputStream {
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
type);
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
blks[i].getStartOffset(), rp.startOffsetInBlock,
rp.startOffsetInBlock + rp.readLength - 1, buf,
blks[i].getStartOffset(), rp.getStartOffsetInBlock(),
rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf,
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
@ -451,14 +530,14 @@ public class DFSStripedInputStream extends DFSInputStream {
};
}
private void waitNextCompletion(CompletionService<Void> stripedReadsService,
Map<Future<Void>, Integer> futures) throws InterruptedException {
private <T> void waitNextCompletion(CompletionService<T> service,
Map<Future<T>, Integer> futures) throws InterruptedException {
if (futures.isEmpty()) {
throw new InterruptedException("Futures already empty");
}
Future<Void> future = null;
Future<T> future = null;
try {
future = stripedReadsService.take();
future = service.take();
future.get();
futures.remove(future);
} catch (ExecutionException | CancellationException e) {

View File

@ -169,22 +169,22 @@ public class StripedBlockUtil {
// blkIdxInGroup is the index of the block in the striped block group
// E.g., blk_2 is the 3rd block in the group
final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
startInBlk % cellSize;
results[blkIdxInGroup].setStartOffsetInBlock(cellSize * cellIdxInBlk +
startInBlk % cellSize);
boolean crossStripe = false;
for (int i = 1; i < dataBlkNum; i++) {
if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
cellIdxInBlk++;
crossStripe = true;
}
results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
cellSize * cellIdxInBlk;
results[(blkIdxInGroup + i) % dataBlkNum].setStartOffsetInBlock(
cellSize * cellIdxInBlk);
}
int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
results[blkIdxInGroup].lengths.add(firstCellLen);
results[blkIdxInGroup].readLength += firstCellLen;
results[blkIdxInGroup].addReadLength(firstCellLen);
int i = (blkIdxInGroup + 1) % dataBlkNum;
for (int done = firstCellLen; done < len; done += cellSize) {
@ -192,7 +192,7 @@ public class StripedBlockUtil {
rp.offsetsInBuf.add(done + bufOffset);
final int readLen = Math.min(len - done, cellSize);
rp.lengths.add(readLen);
rp.readLength += readLen;
rp.addReadLength(readLen);
i = (i + 1) % dataBlkNum;
}
return results;
@ -274,8 +274,8 @@ public class StripedBlockUtil {
* | (partial) | (from blk_1 and blk_2) | |
* +------------------------------------------------------+
*/
public long startOffsetInBlock = 0;
public int readLength = 0;
private long startOffsetInBlock = 0;
private int readLength = 0;
public final List<Integer> offsetsInBuf = new ArrayList<>();
public final List<Integer> lengths = new ArrayList<>();
@ -295,10 +295,20 @@ public class StripedBlockUtil {
return lens;
}
public boolean containsReadPortion(ReadPortion rp) {
long end = startOffsetInBlock + readLength;
return startOffsetInBlock <= rp.startOffsetInBlock && end >=
rp.startOffsetInBlock + rp.readLength;
public long getStartOffsetInBlock() {
return startOffsetInBlock;
}
public int getReadLength() {
return readLength;
}
public void setStartOffsetInBlock(long startOffsetInBlock) {
this.startOffsetInBlock = startOffsetInBlock;
}
void addReadLength(int extraLength) {
this.readLength += extraLength;
}
}

View File

@ -158,7 +158,7 @@ public class TestDFSStripedInputStream {
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException {
Path testPath = new Path(src);
byte[] bytes = generateBytes(writeBytes);
final byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
//check file length
@ -175,7 +175,8 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
}
}
@ -190,12 +191,12 @@ public class TestDFSStripedInputStream {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]);
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
}
}
@ -214,8 +215,47 @@ public class TestDFSStripedInputStream {
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]);
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf.array()[i]);
}
}
// stateful read with 1KB size byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final byte[] result = new byte[writeBytes];
final byte[] buf = new byte[1024];
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf, 0, buf.length);
if (ret > 0) {
System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret;
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result);
}
// stateful read using ByteBuffer with 1KB size
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final ByteBuffer result = ByteBuffer.allocate(writeBytes);
final ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf);
if (ret > 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result.array());
}
}
}

View File

@ -38,8 +38,8 @@ public class TestPlanReadPortions {
assertEquals(GROUP_SIZE, results.length);
for (int i = 0; i < GROUP_SIZE; i++) {
assertEquals(readLengths[i], results[i].readLength);
assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock);
assertEquals(readLengths[i], results[i].getReadLength());
assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
final int[] bOffsets = results[i].getOffsets();
assertArrayEquals(bufferOffsets[i], bOffsets);
final int[] bLengths = results[i].getLengths();