diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index e30b2ed2ec7..77272e7adb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -161,3 +161,6 @@ HDFS-8316. Erasure coding: refactor EC constants to be consistent with HDFS-8249. (Zhe Zhang via jing9) + + HDFS-8281. Erasure Coding: implement parallel stateful reading for striped layout. + (jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6e58cd60019..0d51a5752da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -717,6 +717,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, interface ReaderStrategy { public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException; + + /** + * Copy data from the src ByteBuffer into the read buffer. + * @param src The src buffer where the data is copied from + * @param offset Useful only when the ReadStrategy is based on a byte array. + * Indicate the offset of the byte array for copy. + * @param length Useful only when the ReadStrategy is based on a byte array. + * Indicate the length of the data to copy. + */ + public int copyFrom(ByteBuffer src, int offset, int length); } protected void updateReadStatistics(ReadStatistics readStatistics, @@ -750,6 +760,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, updateReadStatistics(readStatistics, nRead, blockReader); return nRead; } + + @Override + public int copyFrom(ByteBuffer src, int offset, int length) { + ByteBuffer writeSlice = src.duplicate(); + writeSlice.get(buf, offset, length); + return length; + } } /** @@ -783,6 +800,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } } } + + @Override + public int copyFrom(ByteBuffer src, int offset, int length) { + ByteBuffer writeSlice = src.duplicate(); + int remaining = Math.min(buf.remaining(), writeSlice.remaining()); + writeSlice.limit(writeSlice.position() + remaining); + buf.put(writeSlice); + return remaining; + } } /* This is a used by regular read() and handles ChecksumExceptions. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 0dc98fdbb00..13c4743040c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import com.google.common.base.Preconditions; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.*; @@ -37,6 +38,7 @@ import java.util.Set; import java.util.Map; import java.util.HashMap; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.CancellationException; @@ -62,7 +64,7 @@ import java.util.concurrent.Future; * +------+ <- A cell contains {@link #cellSize} bytes of data * * Three styles of read will eventually be supported: - * 1. Stateful read: TODO: HDFS-8033 + * 1. Stateful read * 2. pread without decode support * This is implemented by calculating the portion of read from each block and * issuing requests to each DataNode in parallel. @@ -91,12 +93,38 @@ public class DFSStripedInputStream extends DFSInputStream { } } + /** Used to indicate the buffered data's range in the block group */ + private static class StripeRange { + /** start offset in the block group (inclusive) */ + final long offsetInBlock; + /** length of the stripe range */ + final long length; + + StripeRange(long offsetInBlock, long length) { + Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0); + this.offsetInBlock = offsetInBlock; + this.length = length; + } + + boolean include(long pos) { + return pos >= offsetInBlock && pos < offsetInBlock + length; + } + } + private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS; private final BlockReader[] blockReaders = new BlockReader[groupSize]; private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize]; private final int cellSize; private final short dataBlkNum; private final short parityBlkNum; + /** the buffer for a complete stripe */ + private ByteBuffer curStripeBuf; + /** + * indicate the start/end offset of the current buffered stripe in the + * block group + */ + private StripeRange curStripeRange; + private final CompletionService readingService; DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECInfo ecInfo) throws IOException { @@ -106,7 +134,20 @@ public class DFSStripedInputStream extends DFSInputStream { cellSize = ecInfo.getSchema().getChunkSize(); dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits(); parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits(); - DFSClient.LOG.debug("Creating an striped input stream for file " + src); + curStripeRange = new StripeRange(0, 0); + readingService = + new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Creating an striped input stream for file " + src); + } + } + + private void resetCurStripeBuffer() { + if (curStripeBuf == null) { + curStripeBuf = ByteBuffer.allocateDirect(cellSize * dataBlkNum); + } + curStripeBuf.clear(); + curStripeRange = new StripeRange(0, 0); } @Override @@ -141,7 +182,7 @@ public class DFSStripedInputStream extends DFSInputStream { targetBlockGroup.getBlockSize() - 1; currentLocatedBlock = targetBlockGroup; - long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset(); + final long offsetIntoBlockGroup = getOffsetInBlockGroup(); LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup( targetBlockGroup, cellSize, dataBlkNum, parityBlkNum); // The purpose is to get start offset into each block @@ -156,8 +197,8 @@ public class DFSStripedInputStream extends DFSInputStream { if (retval != null) { currentNodes[i] = retval.info; blockReaders[i] = getBlockReaderWithRetry(targetBlock, - readPortions[i].startOffsetInBlock, - targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock, + readPortions[i].getStartOffsetInBlock(), + targetBlock.getBlockSize() - readPortions[i].getStartOffsetInBlock(), retval.addr, retval.storageType, retval.info, target, retry); } } @@ -203,6 +244,7 @@ public class DFSStripedInputStream extends DFSInputStream { */ @Override protected void closeCurrentBlockReaders() { + resetCurStripeBuffer(); if (blockReaders == null || blockReaders.length == 0) { return; } @@ -220,6 +262,73 @@ public class DFSStripedInputStream extends DFSInputStream { blockEnd = -1; } + private long getOffsetInBlockGroup() { + return pos - currentLocatedBlock.getStartOffset(); + } + + /** + * Read a new stripe covering the current position, and store the data in the + * {@link #curStripeBuf}. + */ + private void readOneStripe( + Map> corruptedBlockMap) + throws IOException { + resetCurStripeBuffer(); + + // compute stripe range based on pos + final long offsetInBlockGroup = getOffsetInBlockGroup(); + final long stripeLen = cellSize * dataBlkNum; + int stripeIndex = (int) (offsetInBlockGroup / stripeLen); + curStripeRange = new StripeRange(stripeIndex * stripeLen, + Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen), + stripeLen)); + final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1); + + // read the whole stripe in parallel + Map, Integer> futures = new HashMap<>(); + for (int i = 0; i < numCell; i++) { + curStripeBuf.position(cellSize * i); + curStripeBuf.limit((int) Math.min(cellSize * (i + 1), + curStripeRange.length)); + ByteBuffer buf = curStripeBuf.slice(); + ByteBufferStrategy strategy = new ByteBufferStrategy(buf); + final int targetLength = buf.remaining(); + Callable readCallable = readCell(blockReaders[i], + currentNodes[i], strategy, targetLength, corruptedBlockMap); + Future request = readingService.submit(readCallable); + futures.put(request, i); + } + while (!futures.isEmpty()) { + try { + waitNextCompletion(readingService, futures); + // TODO: decode and record bad reader if necessary + } catch (InterruptedException ignored) { + // ignore and retry + } + } + } + + private Callable readCell(final BlockReader reader, + final DatanodeInfo datanode, final ByteBufferStrategy strategy, + final int targetLength, + final Map> corruptedBlockMap) { + return new Callable() { + @Override + public Integer call() throws Exception { + int result = 0; + while (result < targetLength) { + int ret = readBuffer(reader, datanode, strategy, corruptedBlockMap); + if (ret < 0) { + throw new IOException("Unexpected EOS from the reader"); + } + result += ret; + } + updateReadStatistics(readStatistics, targetLength, reader); + return result; + } + }; + } + @Override protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { @@ -227,11 +336,10 @@ public class DFSStripedInputStream extends DFSInputStream { if (closed.get()) { throw new IOException("Stream closed"); } - Map> corruptedBlockMap = new HashMap<>(); + Map> corruptedBlockMap = + new ConcurrentHashMap<>(); failures = 0; if (pos < getFileLength()) { - /** Index of the target block in a stripe to read from */ - int idxInGroup = (int) ((pos / cellSize) % dataBlkNum); try { if (pos > blockEnd) { blockSeekTo(pos); @@ -247,40 +355,13 @@ public class DFSStripedInputStream extends DFSInputStream { /** Number of bytes already read into buffer */ int result = 0; while (result < realLen) { - /** - * Temporary position into the file; {@link pos} might not proceed - * to this temporary position in case of exceptions. - */ - long tmpPos = pos + result; - /** Start and end offsets of a cell in the file */ - long cellStart = (tmpPos / cellSize) * cellSize; - long cellEnd = cellStart + cellSize - 1; - - /** Number of bytes to read from the current cell */ - int realLenInCell = (int) Math.min(realLen - result, - cellEnd - tmpPos + 1L); - assert realLenInCell > 0 : "Temporary position shouldn't be " - + "after cellEnd"; - - // Read from one blockReader up to cell boundary - int cellRet = readBuffer(blockReaders[idxInGroup], - currentNodes[idxInGroup], strategy, off + result, realLenInCell, - corruptedBlockMap); - if (cellRet >= 0) { - result += cellRet; - if (cellRet < realLenInCell) { - // A short read indicates the current blockReader buffer is - // already drained. Should return the read call. Otherwise - // should proceed to the next cell. - break; - } - } else { - // got a EOS from reader though we expect more data on it. - throw new IOException("Unexpected EOS from the reader"); + if (!curStripeRange.include(getOffsetInBlockGroup())) { + readOneStripe(corruptedBlockMap); } - idxInGroup = (idxInGroup + 1) % dataBlkNum; + int ret = copy(strategy, off + result, realLen - result); + result += ret; + pos += ret; } - pos += result; if (dfsClient.stats != null) { dfsClient.stats.incrementBytesRead(result); } @@ -295,11 +376,11 @@ public class DFSStripedInputStream extends DFSInputStream { return -1; } - private synchronized int readBuffer(BlockReader blockReader, - DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len, + private int readBuffer(BlockReader blockReader, + DatanodeInfo currentNode, ByteBufferStrategy readerStrategy, Map> corruptedBlockMap) { try { - return readerStrategy.doRead(blockReader, off, len); + return readerStrategy.doRead(blockReader, 0, 0); } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode @@ -312,26 +393,25 @@ public class DFSStripedInputStream extends DFSInputStream { + getCurrentBlock() + " of " + src + " from " + currentNode, e); } - // TODO: this should trigger decoding logic (HDFS-7678) return -1; } - protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy { - ByteBufferStrategy(ByteBuffer buf) { - super(buf); - } - - @Override - public int doRead(BlockReader blockReader, int off, int len) - throws IOException { - int oldlimit = buf.limit(); - if (buf.remaining() > len) { - buf.limit(buf.position() + len); - } - int ret = super.doRead(blockReader, off, len); - buf.limit(oldlimit); - return ret; - } + /** + * Copy the data from {@link #curStripeBuf} into the given buffer + * @param strategy the ReaderStrategy containing the given buffer + * @param offset the offset of the given buffer. Used only when strategy is + * a ByteArrayStrategy + * @param length target length + * @return number of bytes copied + */ + private int copy(ReaderStrategy strategy, int offset, int length) { + final long stripeLen = cellSize * dataBlkNum; + final long offsetInBlk = pos - currentLocatedBlock.getStartOffset(); + // compute the position in the curStripeBuf based on "pos" + int bufOffset = (int) (offsetInBlk % stripeLen); + curStripeBuf.position(bufOffset); + return strategy.copyFrom(curStripeBuf, offset, + Math.min(length, curStripeBuf.remaining())); } /** @@ -366,8 +446,7 @@ public class DFSStripedInputStream extends DFSInputStream { DFSClient.LOG.debug("getBlockAt for striped blocks, offset=" + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx); } - return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, - dataBlkNum, idx); + return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize, dataBlkNum, idx); } private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException { @@ -404,7 +483,7 @@ public class DFSStripedInputStream extends DFSInputStream { for (short i = 0; i < dataBlkNum; i++) { ReadPortion rp = readPortions[i]; - if (rp.readLength <= 0) { + if (rp.getReadLength() <= 0) { continue; } DatanodeInfo loc = blks[i].getLocations()[0]; @@ -413,8 +492,8 @@ public class DFSStripedInputStream extends DFSInputStream { loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), type); Callable readCallable = getFromOneDataNode(dnAddr, - blks[i].getStartOffset(), rp.startOffsetInBlock, - rp.startOffsetInBlock + rp.readLength - 1, buf, + blks[i].getStartOffset(), rp.getStartOffsetInBlock(), + rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf, rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i); Future getFromDNRequest = stripedReadsService.submit(readCallable); DFSClient.LOG.debug("Submitting striped read request for " + blks[i]); @@ -451,14 +530,14 @@ public class DFSStripedInputStream extends DFSInputStream { }; } - private void waitNextCompletion(CompletionService stripedReadsService, - Map, Integer> futures) throws InterruptedException { + private void waitNextCompletion(CompletionService service, + Map, Integer> futures) throws InterruptedException { if (futures.isEmpty()) { throw new InterruptedException("Futures already empty"); } - Future future = null; + Future future = null; try { - future = stripedReadsService.take(); + future = service.take(); future.get(); futures.remove(future); } catch (ExecutionException | CancellationException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index b18e36f0fb4..24d4bfba844 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -169,22 +169,22 @@ public class StripedBlockUtil { // blkIdxInGroup is the index of the block in the striped block group // E.g., blk_2 is the 3rd block in the group final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum); - results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk + - startInBlk % cellSize; + results[blkIdxInGroup].setStartOffsetInBlock(cellSize * cellIdxInBlk + + startInBlk % cellSize); boolean crossStripe = false; for (int i = 1; i < dataBlkNum; i++) { if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) { cellIdxInBlk++; crossStripe = true; } - results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock = - cellSize * cellIdxInBlk; + results[(blkIdxInGroup + i) % dataBlkNum].setStartOffsetInBlock( + cellSize * cellIdxInBlk); } int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len); results[blkIdxInGroup].offsetsInBuf.add(bufOffset); results[blkIdxInGroup].lengths.add(firstCellLen); - results[blkIdxInGroup].readLength += firstCellLen; + results[blkIdxInGroup].addReadLength(firstCellLen); int i = (blkIdxInGroup + 1) % dataBlkNum; for (int done = firstCellLen; done < len; done += cellSize) { @@ -192,7 +192,7 @@ public class StripedBlockUtil { rp.offsetsInBuf.add(done + bufOffset); final int readLen = Math.min(len - done, cellSize); rp.lengths.add(readLen); - rp.readLength += readLen; + rp.addReadLength(readLen); i = (i + 1) % dataBlkNum; } return results; @@ -274,8 +274,8 @@ public class StripedBlockUtil { * | (partial) | (from blk_1 and blk_2) | | * +------------------------------------------------------+ */ - public long startOffsetInBlock = 0; - public int readLength = 0; + private long startOffsetInBlock = 0; + private int readLength = 0; public final List offsetsInBuf = new ArrayList<>(); public final List lengths = new ArrayList<>(); @@ -295,10 +295,20 @@ public class StripedBlockUtil { return lens; } - public boolean containsReadPortion(ReadPortion rp) { - long end = startOffsetInBlock + readLength; - return startOffsetInBlock <= rp.startOffsetInBlock && end >= - rp.startOffsetInBlock + rp.readLength; + public long getStartOffsetInBlock() { + return startOffsetInBlock; + } + + public int getReadLength() { + return readLength; + } + + public void setStartOffsetInBlock(long startOffsetInBlock) { + this.startOffsetInBlock = startOffsetInBlock; + } + + void addReadLength(int extraLength) { + this.readLength += extraLength; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index bcfc74b3d5b..11cdf7b4985 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -158,7 +158,7 @@ public class TestDFSStripedInputStream { private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) throws IOException { Path testPath = new Path(src); - byte[] bytes = generateBytes(writeBytes); + final byte[] bytes = generateBytes(writeBytes); DFSTestUtil.writeFile(fs, testPath, new String(bytes)); //check file length @@ -175,7 +175,8 @@ public class TestDFSStripedInputStream { Assert.assertEquals("The length of file should be the same to write size", writeBytes, readLen); for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]); + Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), + buf[i]); } } @@ -190,12 +191,12 @@ public class TestDFSStripedInputStream { readLen += ret; } } while (ret >= 0); - readLen = readLen >= 0 ? readLen : 0; Assert.assertEquals("The length of file should be the same to write size", writeBytes, readLen); for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]); + Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), + buf[i]); } } @@ -214,8 +215,47 @@ public class TestDFSStripedInputStream { Assert.assertEquals("The length of file should be the same to write size", writeBytes, readLen); for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]); + Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), + buf.array()[i]); } } + + // stateful read with 1KB size byte array + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + final byte[] result = new byte[writeBytes]; + final byte[] buf = new byte[1024]; + int readLen = 0; + int ret; + do { + ret = fsdis.read(buf, 0, buf.length); + if (ret > 0) { + System.arraycopy(buf, 0, result, readLen, ret); + readLen += ret; + } + } while (ret >= 0); + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + Assert.assertArrayEquals(bytes, result); + } + + // stateful read using ByteBuffer with 1KB size + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + final ByteBuffer result = ByteBuffer.allocate(writeBytes); + final ByteBuffer buf = ByteBuffer.allocate(1024); + int readLen = 0; + int ret; + do { + ret = fsdis.read(buf); + if (ret > 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } while (ret >= 0); + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + Assert.assertArrayEquals(bytes, result.array()); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java index 3b5787a4a94..75d05879f21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPlanReadPortions.java @@ -38,8 +38,8 @@ public class TestPlanReadPortions { assertEquals(GROUP_SIZE, results.length); for (int i = 0; i < GROUP_SIZE; i++) { - assertEquals(readLengths[i], results[i].readLength); - assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock); + assertEquals(readLengths[i], results[i].getReadLength()); + assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock()); final int[] bOffsets = results[i].getOffsets(); assertArrayEquals(bufferOffsets[i], bOffsets); final int[] bLengths = results[i].getLengths();