diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 11e83760226..fed08e17971 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -186,3 +186,6 @@ HDFS-8129. Erasure Coding: Maintain consistent naming for Erasure Coding related classes - EC/ErasureCoding (umamahesh) + + HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream. + (Yi Liu via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 7cb7b6dd860..9011192aab6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.ByteBufferPool; + import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions; @@ -31,9 +34,11 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.EnumSet; import java.util.Set; import java.util.Map; import java.util.HashMap; @@ -263,6 +268,10 @@ public class DFSStripedInputStream extends DFSInputStream { } private long getOffsetInBlockGroup() { + return getOffsetInBlockGroup(pos); + } + + private long getOffsetInBlockGroup(long pos) { return pos - currentLocatedBlock.getStartOffset(); } @@ -278,18 +287,22 @@ public class DFSStripedInputStream extends DFSInputStream { // compute stripe range based on pos final long offsetInBlockGroup = getOffsetInBlockGroup(); final long stripeLen = cellSize * dataBlkNum; - int stripeIndex = (int) (offsetInBlockGroup / stripeLen); - curStripeRange = new StripeRange(stripeIndex * stripeLen, - Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen), - stripeLen)); - final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1); + final int stripeIndex = (int) (offsetInBlockGroup / stripeLen); + final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen); + final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize() + - (stripeIndex * stripeLen), stripeLen); + curStripeRange = new StripeRange(offsetInBlockGroup, + stripeLimit - stripeBufOffset); + + final int startCell = stripeBufOffset / cellSize; + final int numCell = (stripeLimit - 1) / cellSize + 1; // read the whole stripe in parallel Map, Integer> futures = new HashMap<>(); - for (int i = 0; i < numCell; i++) { - curStripeBuf.position(cellSize * i); - curStripeBuf.limit((int) Math.min(cellSize * (i + 1), - curStripeRange.length)); + for (int i = startCell; i < numCell; i++) { + int bufPos = i == startCell ? stripeBufOffset : cellSize * i; + curStripeBuf.position(bufPos); + curStripeBuf.limit(Math.min(cellSize * (i + 1), stripeLimit)); ByteBuffer buf = curStripeBuf.slice(); ByteBufferStrategy strategy = new ByteBufferStrategy(buf); final int targetLength = buf.remaining(); @@ -329,6 +342,39 @@ public class DFSStripedInputStream extends DFSInputStream { }; } + /** + * Seek to a new arbitrary location + */ + @Override + public synchronized void seek(long targetPos) throws IOException { + if (targetPos > getFileLength()) { + throw new EOFException("Cannot seek after EOF"); + } + if (targetPos < 0) { + throw new EOFException("Cannot seek to negative offset"); + } + if (closed.get()) { + throw new IOException("Stream is closed!"); + } + if (targetPos <= blockEnd) { + final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos); + if (curStripeRange.include(targetOffsetInBlk)) { + int bufOffset = getStripedBufOffset(targetOffsetInBlk); + curStripeBuf.position(bufOffset); + pos = targetPos; + return; + } + } + pos = targetPos; + blockEnd = -1; + } + + private int getStripedBufOffset(long offsetInBlockGroup) { + final long stripeLen = cellSize * dataBlkNum; + // compute the position in the curStripeBuf based on "pos" + return (int) (offsetInBlockGroup % stripeLen); + } + @Override protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { @@ -405,10 +451,8 @@ public class DFSStripedInputStream extends DFSInputStream { * @return number of bytes copied */ private int copy(ReaderStrategy strategy, int offset, int length) { - final long stripeLen = cellSize * dataBlkNum; - final long offsetInBlk = pos - currentLocatedBlock.getStartOffset(); - // compute the position in the curStripeBuf based on "pos" - int bufOffset = (int) (offsetInBlk % stripeLen); + final long offsetInBlk = getOffsetInBlockGroup(); + int bufOffset = getStripedBufOffset(offsetInBlk); curStripeBuf.position(bufOffset); return strategy.copyFrom(curStripeBuf, offset, Math.min(length, curStripeBuf.remaining())); @@ -546,4 +590,22 @@ public class DFSStripedInputStream extends DFSInputStream { } throw new InterruptedException("let's retry"); } + + /** + * May need online read recovery, zero-copy read doesn't make + * sense, so don't support it. + */ + @Override + public synchronized ByteBuffer read(ByteBufferPool bufferPool, + int maxLength, EnumSet opts) + throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException( + "Not support enhanced byte buffer access."); + } + + @Override + public synchronized void releaseBuffer(ByteBuffer buffer) { + throw new UnsupportedOperationException( + "Not support enhanced byte buffer access."); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index eacc6edddf6..5c6f449f5f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -22,12 +22,12 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.HdfsConstants; - import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; @@ -150,11 +150,35 @@ public class TestWriteReadStripedFile { return bytes; } + private int readAll(FSDataInputStream in, byte[] buf) throws IOException { + int readLen = 0; + int ret; + do { + ret = in.read(buf, readLen, buf.length - readLen); + if (ret > 0) { + readLen += ret; + } + } while (ret >= 0 && readLen < buf.length); + return readLen; + } + private byte getByte(long pos) { final int mod = 29; return (byte) (pos % mod + 1); } + private void assertSeekAndRead(FSDataInputStream fsdis, int pos, + int writeBytes) throws IOException { + fsdis.seek(pos); + byte[] buf = new byte[writeBytes]; + int readLen = readAll(fsdis, buf); + Assert.assertEquals(readLen, writeBytes - pos); + for (int i = 0; i < readLen; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", + getByte(pos + i), buf[i]); + } + } + private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) throws IOException { Path testPath = new Path(src); @@ -183,15 +207,7 @@ public class TestWriteReadStripedFile { // stateful read with byte array try (FSDataInputStream fsdis = fs.open(new Path(src))) { byte[] buf = new byte[writeBytes + 100]; - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf, readLen, buf.length - readLen); - if (ret > 0) { - readLen += ret; - } - } while (ret >= 0); - readLen = readLen >= 0 ? readLen : 0; + int readLen = readAll(fsdis, buf); Assert.assertEquals("The length of file should be the same to write size", writeBytes, readLen); for (int i = 0; i < writeBytes; i++) { @@ -200,6 +216,53 @@ public class TestWriteReadStripedFile { } } + // seek and stateful read + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + // seek to 1/2 of content + int pos = writeBytes/2; + assertSeekAndRead(fsdis, pos, writeBytes); + + // seek to 1/3 of content + pos = writeBytes/3; + assertSeekAndRead(fsdis, pos, writeBytes); + + // seek to 0 pos + pos = 0; + assertSeekAndRead(fsdis, pos, writeBytes); + + if (writeBytes > cellSize) { + // seek to cellSize boundary + pos = cellSize -1; + assertSeekAndRead(fsdis, pos, writeBytes); + } + + if (writeBytes > cellSize * dataBlocks) { + // seek to striped cell group boundary + pos = cellSize * dataBlocks - 1; + assertSeekAndRead(fsdis, pos, writeBytes); + } + + if (writeBytes > blockSize * dataBlocks) { + // seek to striped block group boundary + pos = blockSize * dataBlocks - 1; + assertSeekAndRead(fsdis, pos, writeBytes); + } + + try { + fsdis.seek(-1); + Assert.fail("Should be failed if seek to negative offset"); + } catch (EOFException e) { + // expected + } + + try { + fsdis.seek(writeBytes + 1); + Assert.fail("Should be failed if seek after EOF"); + } catch (EOFException e) { + // expected + } + } + // stateful read with ByteBuffer try (FSDataInputStream fsdis = fs.open(new Path(src))) { ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);