From 89d33785780f98a58e1e81eca2c27165840475df Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Fri, 24 Apr 2015 22:36:15 -0700 Subject: [PATCH] HDFS-8033. Erasure coding: stateful (non-positional) read from files in striped layout. Contributed by Zhe Zhang. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../apache/hadoop/hdfs/DFSInputStream.java | 55 ++-- .../hadoop/hdfs/DFSStripedInputStream.java | 311 +++++++++++++++++- .../hdfs/TestDFSStripedInputStream.java | 43 +++ .../hadoop/hdfs/TestReadStripedFile.java | 110 ++++++- 5 files changed, 465 insertions(+), 57 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index cf41a9b2ae1..e8db485ef8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -131,3 +131,6 @@ HDFS-8228. Erasure Coding: SequentialBlockGroupIdGenerator#nextValue may cause block id conflicts (Jing Zhao via Zhe Zhang) + + HDFS-8033. Erasure coding: stateful (non-positional) read from files in + striped layout (Zhe Zhang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index ecf74f7abbe..6649f4c87ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -96,34 +96,34 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; protected final DFSClient dfsClient; - private AtomicBoolean closed = new AtomicBoolean(false); - private final String src; - private final boolean verifyChecksum; + protected AtomicBoolean closed = new AtomicBoolean(false); + protected final String src; + protected final boolean verifyChecksum; // state by stateful read only: // (protected by lock on this) ///// private DatanodeInfo currentNode = null; - private LocatedBlock currentLocatedBlock = null; - private long pos = 0; - private long blockEnd = -1; + protected LocatedBlock currentLocatedBlock = null; + protected long pos = 0; + protected long blockEnd = -1; private BlockReader blockReader = null; //// // state shared by stateful and positional read: // (protected by lock on infoLock) //// - private LocatedBlocks locatedBlocks = null; + protected LocatedBlocks locatedBlocks = null; private long lastBlockBeingWrittenLength = 0; private FileEncryptionInfo fileEncryptionInfo = null; - private CachingStrategy cachingStrategy; + protected CachingStrategy cachingStrategy; //// - private final ReadStatistics readStatistics = new ReadStatistics(); + protected final ReadStatistics readStatistics = new ReadStatistics(); // lock for state shared between read and pread // Note: Never acquire a lock on with this lock held to avoid deadlocks // (it's OK to acquire this lock when the lock on is held) - private final Object infoLock = new Object(); + protected final Object infoLock = new Object(); /** * Track the ByteBuffers that we have handed out to readers. @@ -240,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * back to the namenode to get a new list of block locations, and is * capped at maxBlockAcquireFailures */ - private int failures = 0; + protected int failures = 0; /* XXX Use of CocurrentHashMap is temp fix. Need to fix * parallel accesses to DFSInputStream (through ptreads) properly */ @@ -477,7 +477,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } /** Fetch a block from namenode and cache it */ - private void fetchBlockAt(long offset) throws IOException { + protected void fetchBlockAt(long offset) throws IOException { synchronized(infoLock) { int targetBlockIdx = locatedBlocks.findBlock(offset); if (targetBlockIdx < 0) { // block is not cached @@ -580,7 +580,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } // Will be getting a new BlockReader. - closeCurrentBlockReader(); + closeCurrentBlockReaders(); // // Connect to best DataNode for desired Block, with potential offset @@ -621,7 +621,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, return chosenNode; } catch (IOException ex) { if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr + " : " + ex); // The encryption key used is invalid. @@ -697,7 +697,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, "unreleased ByteBuffers allocated by read(). " + "Please release " + builder.toString() + "."); } - closeCurrentBlockReader(); + closeCurrentBlockReaders(); super.close(); } @@ -719,7 +719,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, throws ChecksumException, IOException; } - private void updateReadStatistics(ReadStatistics readStatistics, + protected void updateReadStatistics(ReadStatistics readStatistics, int nRead, BlockReader blockReader) { if (nRead <= 0) return; synchronized(infoLock) { @@ -755,7 +755,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Used to read bytes into a user-supplied ByteBuffer */ - private class ByteBufferStrategy implements ReaderStrategy { + protected class ByteBufferStrategy implements ReaderStrategy { final ByteBuffer buf; ByteBufferStrategy(ByteBuffer buf) { this.buf = buf; @@ -771,6 +771,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, int ret = blockReader.read(buf); success = true; updateReadStatistics(readStatistics, ret, blockReader); + if (ret == 0) { + DFSClient.LOG.warn("zero"); + } return ret; } finally { if (!success) { @@ -838,7 +841,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } } - private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { + protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { dfsClient.checkOpen(); if (closed.get()) { throw new IOException("Stream closed"); @@ -927,7 +930,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** * Add corrupted block replica into map. */ - private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, + protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, Map> corruptedBlockMap) { Set dnSet = null; if((corruptedBlockMap.containsKey(blk))) { @@ -999,7 +1002,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * @param ignoredNodes Do not choose nodes in this array (may be null) * @return The DNAddrPair of the best node. Null if no node can be chosen. */ - private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, + protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, Collection ignoredNodes) { DatanodeInfo[] nodes = block.getLocations(); StorageType[] storageTypes = block.getStorageTypes(); @@ -1368,7 +1371,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * @return true if block access token has expired or invalid and it should be * refetched */ - private static boolean tokenRefetchNeeded(IOException ex, + protected static boolean tokenRefetchNeeded(IOException ex, InetSocketAddress targetAddr) { /* * Get a new access token and retry. Retry is needed in 2 cases. 1) @@ -1475,7 +1478,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * @param corruptedBlockMap map of corrupted blocks * @param dataNodeCount number of data nodes who contains the block replicas */ - private void reportCheckSumFailure( + protected void reportCheckSumFailure( Map> corruptedBlockMap, int dataNodeCount) { if (corruptedBlockMap.isEmpty()) { @@ -1672,7 +1675,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, } } - private void closeCurrentBlockReader() { + protected void closeCurrentBlockReaders() { if (blockReader == null) return; // Close the current block reader so that the new caching settings can // take effect immediately. @@ -1692,7 +1695,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build(); } - closeCurrentBlockReader(); + closeCurrentBlockReaders(); } @Override @@ -1702,7 +1705,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, this.cachingStrategy = new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build(); } - closeCurrentBlockReader(); + closeCurrentBlockReaders(); } /** @@ -1860,6 +1863,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, @Override public synchronized void unbuffer() { - closeCurrentBlockReader(); + closeCurrentBlockReaders(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index d0e2b6822d0..fe9e101c26f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -18,20 +18,21 @@ package org.apache.hadoop.hdfs; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; -import org.apache.hadoop.hdfs.protocol.ECInfo; -import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -125,6 +126,9 @@ public class DFSStripedInputStream extends DFSInputStream { return results; } + private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS; + private BlockReader[] blockReaders = null; + private DatanodeInfo[] currentNodes = null; private final int cellSize; private final short dataBlkNum; private final short parityBlkNum; @@ -143,13 +147,285 @@ public class DFSStripedInputStream extends DFSInputStream { @Override public synchronized int read(final ByteBuffer buf) throws IOException { - throw new UnsupportedActionException("Stateful read is not supported"); + ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); + TraceScope scope = + dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src); + try { + return readWithStrategy(byteBufferReader, 0, buf.remaining()); + } finally { + scope.close(); + } + } + + /** + * When seeking into a new block group, create blockReader for each internal + * block in the group. + */ + @VisibleForTesting + private synchronized DatanodeInfo[] blockSeekTo(long target) + throws IOException { + if (target >= getFileLength()) { + throw new IOException("Attempted to read past end of file"); + } + + // Will be getting a new BlockReader. + closeCurrentBlockReaders(); + + // Connect to best DataNode for desired Block, with potential offset + DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize]; + int refetchToken = 1; // only need to get a new access token once + int refetchEncryptionKey = 1; // only need to get a new encryption key once + + // Compute desired striped block group + LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target); + + // Update current position + this.pos = target; + this.blockEnd = targetBlockGroup.getStartOffset() + + targetBlockGroup.getBlockSize() - 1; + + long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset(); + LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup( + targetBlockGroup, cellSize, dataBlkNum, parityBlkNum); + // The purpose is to get start offset into each block + ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, + offsetIntoBlockGroup, 0, 0); + while (true) { + int i = 0; + InetSocketAddress targetAddr = null; + try { + blockReaders = new BlockReader[groupSize]; + for (i = 0; i < groupSize; i++) { + LocatedBlock targetBlock = targetBlocks[i]; + if (targetBlock == null) { + continue; + } + long offsetIntoBlock = readPortions[i].startOffsetInBlock; + DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); + chosenNodes[i] = retval.info; + targetAddr = retval.addr; + StorageType storageType = retval.storageType; + + ExtendedBlock blk = targetBlock.getBlock(); + Token accessToken = targetBlock.getBlockToken(); + CachingStrategy curCachingStrategy; + boolean shortCircuitForbidden; + synchronized(infoLock) { + curCachingStrategy = cachingStrategy; + shortCircuitForbidden = shortCircuitForbidden(); + } + blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()). + setInetSocketAddress(targetAddr). + setRemotePeerFactory(dfsClient). + setDatanodeInfo(chosenNodes[i]). + setStorageType(storageType). + setFileName(src). + setBlock(blk). + setBlockToken(accessToken). + setStartOffset(offsetIntoBlock). + setVerifyChecksum(verifyChecksum). + setClientName(dfsClient.clientName). + setLength(blk.getNumBytes() - offsetIntoBlock). + setCachingStrategy(curCachingStrategy). + setAllowShortCircuitLocalReads(!shortCircuitForbidden). + setClientCacheContext(dfsClient.getClientContext()). + setUserGroupInformation(dfsClient.ugi). + setConfiguration(dfsClient.getConfiguration()). + build(); + } + currentLocatedBlock = targetBlockGroup; + return chosenNodes; + } catch (IOException ex) { + // Retry in case of encryption key or token exceptions. Otherwise throw + // IOException: since each internal block is singly replicated, it's + // not meaningful trying to locate another replica. + if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + targetAddr + + " : " + ex); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) { + refetchToken--; + fetchBlockAt(target); + } else { + DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" + + ", add to deadNodes and continue. " + ex, ex); + // Put chosen node into dead list and throw exception + addToDeadNodes(chosenNodes[i]); + throw ex; + } + } + } + } + + /** + * Extend the super method with the logic of switching between cells. + * When reaching the end of a cell, proceed to the next cell and read it + * with the next blockReader. + */ + @Override + protected void closeCurrentBlockReaders() { + if (blockReaders == null || blockReaders.length == 0) { + return; + } + for (int i = 0; i < groupSize; i++) { + if (blockReaders[i] == null) { + continue; + } + try { + blockReaders[i].close(); + } catch (IOException e) { + DFSClient.LOG.error("error closing blockReader", e); + } + blockReaders[i] = null; + } + blockEnd = -1; } @Override - public synchronized int read(final byte buf[], int off, int len) + protected synchronized int readWithStrategy(ReaderStrategy strategy, + int off, int len) throws IOException { + dfsClient.checkOpen(); + if (closed.get()) { + throw new IOException("Stream closed"); + } + Map> corruptedBlockMap + = new HashMap<>(); + failures = 0; + if (pos < getFileLength()) { + int retries = 2; + /** Index of the target block in a stripe to read from */ + int idxInGroup = (int) ((pos / cellSize) % dataBlkNum); + while (retries > 0) { + try { + // currentNode can be left as null if previous read had a checksum + // error on the same block. See HDFS-3067 + if (pos > blockEnd || currentNodes == null) { + currentNodes = blockSeekTo(pos); + } + int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); + synchronized(infoLock) { + if (locatedBlocks.isLastBlockComplete()) { + realLen = (int) Math.min(realLen, + locatedBlocks.getFileLength() - pos); + } + } + + /** Number of bytes already read into buffer */ + int result = 0; + while (result < realLen) { + /** + * Temporary position into the file; {@link pos} might not proceed + * to this temporary position in case of exceptions. + */ + long tmpPos = pos + result; + /** Start and end offsets of a cell in the file */ + long cellStart = (tmpPos / cellSize) * cellSize; + long cellEnd = cellStart + cellSize - 1; + + /** Number of bytes to read from the current cell */ + int realLenInCell = (int) Math.min(realLen - result, + cellEnd - tmpPos + 1L); + assert realLenInCell > 0 : "Temporary position shouldn't be " + + "after cellEnd"; + // Read from one blockReader up to cell boundary + int cellRet = readBuffer(blockReaders[idxInGroup], + currentNodes[idxInGroup], strategy, off + result, + realLenInCell); + if (cellRet >= 0) { + result += cellRet; + if (cellRet < realLenInCell) { + // A short read indicates the current blockReader buffer is + // already drained. Should return the read call. Otherwise + // should proceed to the next cell. + break; + } + } else { + // got a EOS from reader though we expect more data on it. + throw new IOException("Unexpected EOS from the reader"); + } + idxInGroup = (idxInGroup + 1) % dataBlkNum; + } + + pos += result; + + if (dfsClient.stats != null) { + dfsClient.stats.incrementBytesRead(result); + } + return result; + } catch (ChecksumException ce) { + throw ce; + } catch (IOException e) { + if (retries == 1) { + DFSClient.LOG.warn("DFS Read", e); + } + blockEnd = -1; + if (currentNodes[idxInGroup] != null) { + addToDeadNodes(currentNodes[idxInGroup]); + } + if (--retries == 0) { + throw e; + } + } finally { + // Check if need to report block replicas corruption either read + // was successful or ChecksumException occured. + reportCheckSumFailure(corruptedBlockMap, + currentLocatedBlock.getLocations().length); + } + } + } + return -1; + } + + private synchronized int readBuffer(BlockReader blockReader, + DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len) throws IOException { - throw new UnsupportedActionException("Stateful read is not supported"); + IOException ioe; + while (true) { + try { + return readerStrategy.doRead(blockReader, off, len); + } catch ( ChecksumException ce ) { + DFSClient.LOG.warn("Found Checksum error for " + + getCurrentBlock() + " from " + currentNode + + " at " + ce.getPos()); + // If current block group is corrupt, it's meaningless to retry. + // TODO: this should trigger decoding logic (HDFS-7678) + throw ce; + } catch ( IOException e ) { + ioe = e; + } + + boolean sourceFound = seekToBlockSource(pos); + if (!sourceFound) { + throw ioe; + } + } + } + + private boolean seekToBlockSource(long targetPos) + throws IOException { + currentNodes = blockSeekTo(targetPos); + return true; + } + + protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy { + ByteBufferStrategy(ByteBuffer buf) { + super(buf); + } + + @Override + public int doRead(BlockReader blockReader, int off, int len) + throws ChecksumException, IOException { + int oldlimit = buf.limit(); + if (buf.remaining() > len) { + buf.limit(buf.position() + len); + } + int ret = super.doRead(blockReader, off, len); + buf.limit(oldlimit); + return ret; + } } /** @@ -188,8 +464,11 @@ public class DFSStripedInputStream extends DFSInputStream { dataBlkNum, idx); } - private LocatedBlock getBlockGroupAt(long offset) throws IOException { - return super.getBlockAt(offset); + private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException { + LocatedBlock lb = super.getBlockAt(offset); + assert lb instanceof LocatedStripedBlock : "NameNode" + + " should return a LocatedStripedBlock for a striped file"; + return (LocatedStripedBlock)lb; } /** @@ -206,10 +485,8 @@ public class DFSStripedInputStream extends DFSInputStream { int len = (int) (end - start + 1); // Refresh the striped block group - LocatedBlock block = getBlockGroupAt(blockStartOffset); - assert block instanceof LocatedStripedBlock : "NameNode" + - " should return a LocatedStripedBlock for a striped file"; - LocatedStripedBlock blockGroup = (LocatedStripedBlock) block; + LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset); + // Planning the portion of I/O for each shard ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start, @@ -308,7 +585,7 @@ public class DFSStripedInputStream extends DFSInputStream { * +------------------------------------------------------+ */ private long startOffsetInBlock = 0; - private long readLength = 0; + private int readLength = 0; private final List offsetsInBuf = new ArrayList<>(); private final List lengths = new ArrayList<>(); @@ -328,7 +605,7 @@ public class DFSStripedInputStream extends DFSInputStream { return lens; } - long getReadLength() { + int getReadLength() { return readLength; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 73c735093f9..cf109818c33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -28,6 +28,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; public class TestDFSStripedInputStream { private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; @@ -165,6 +166,7 @@ public class TestDFSStripedInputStream { Assert.assertEquals("File length should be the same", writeBytes, fileLength); + // pread try (DFSStripedInputStream dis = new DFSStripedInputStream(fs.getClient(), src, true)) { byte[] buf = new byte[writeBytes + 100]; @@ -176,5 +178,46 @@ public class TestDFSStripedInputStream { Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]); } } + + // stateful read with byte array + try (DFSStripedInputStream dis = + new DFSStripedInputStream(fs.getClient(), src, true)) { + byte[] buf = new byte[writeBytes + 100]; + int readLen = 0; + int ret; + do { + ret = dis.read(buf, readLen, buf.length - readLen); + if (ret > 0) { + readLen += ret; + } + } while (ret >= 0); + + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + for (int i = 0; i < writeBytes; i++) { + Assert.assertEquals("Byte at i should be the same", getByte(i), buf[i]); + } + } + + // stateful read with ByteBuffer + try (DFSStripedInputStream dis = + new DFSStripedInputStream(fs.getClient(), src, true)) { + ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); + int readLen = 0; + int ret; + do { + ret = dis.read(buf); + if (ret > 0) { + readLen += ret; + } + } while (ret >= 0); + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + for (int i = 0; i < writeBytes; i++) { + Assert.assertEquals("Byte at i should be the same", getByte(i), buf.array()[i]); + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java index b0631cedb6d..d980bd6959a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -28,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; @@ -38,6 +39,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -52,19 +54,21 @@ public class TestReadStripedFile { private Path filePath = new Path(dirPath, "file"); private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; - private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM; private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final int NUM_STRIPE_PER_BLOCK = 2; - private final int BLOCKSIZE = NUM_STRIPE_PER_BLOCK * DATA_BLK_NUM * CELLSIZE; + private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE; + private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE; @Before public void setup() throws IOException { - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE); SimulatedFSDataset.setFactory(conf); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(BLK_GROUP_SIZE) - .build(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + DATA_BLK_NUM + PARITY_BLK_NUM).build(); cluster.waitActive(); fs = cluster.getFileSystem(); + fs.mkdirs(dirPath); + fs.getClient().createErasureCodingZone(dirPath.toString(), null); } @After @@ -80,10 +84,10 @@ public class TestReadStripedFile { @Test public void testGetBlock() throws Exception { final int numBlocks = 4; - DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks, - NUM_STRIPE_PER_BLOCK, true); + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( - filePath.toString(), 0, BLOCKSIZE * numBlocks); + filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks); final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), filePath.toString(), false); @@ -103,11 +107,11 @@ public class TestReadStripedFile { @Test public void testPread() throws Exception { - final int numBlocks = 4; - DFSTestUtil.createStripedFile(cluster, filePath, dirPath, numBlocks, - NUM_STRIPE_PER_BLOCK, true); + final int numBlocks = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( - filePath.toString(), 0, BLOCKSIZE); + filePath.toString(), 0, BLOCK_GROUP_SIZE); assert lbs.get(0) instanceof LocatedStripedBlock; LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0)); @@ -121,11 +125,89 @@ public class TestReadStripedFile { } DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), filePath.toString(), false); - int readSize = BLOCKSIZE; + int readSize = BLOCK_GROUP_SIZE; byte[] readBuffer = new byte[readSize]; int ret = in.read(0, readBuffer, 0, readSize); assertEquals(readSize, ret); // TODO: verify read results with patterned data from HDFS-8117 } + + @Test + public void testStatefulRead() throws Exception { + testStatefulRead(false, false); + testStatefulRead(true, false); + testStatefulRead(true, true); + } + + private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket) + throws Exception { + final int numBlocks = 2; + final int fileSize = numBlocks * BLOCK_GROUP_SIZE; + if (cellMisalignPacket) { + conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1); + tearDown(); + setup(); + } + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, fileSize); + + assert lbs.getLocatedBlocks().size() == numBlocks; + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lb); + for (int i = 0; i < DATA_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + } + + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), + false); + + byte[] expected = new byte[fileSize]; + + for (LocatedBlock bg : lbs.getLocatedBlocks()) { + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = (int) bg.getStartOffset() + + i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[posInFile] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } + } + } + } + + if (useByteBuffer) { + ByteBuffer readBuffer = ByteBuffer.allocate(fileSize); + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer); + assertTrue(ret > 0); + done += ret; + } + assertArrayEquals(expected, readBuffer.array()); + } else { + byte[] readBuffer = new byte[fileSize]; + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer, done, fileSize - done); + assertTrue(ret > 0); + done += ret; + } + assertArrayEquals(expected, readBuffer); + } + fs.delete(filePath, true); + } }