diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index a160520fa16..9b05d2bc701 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -283,3 +283,6 @@ HDFS-8328. Follow-on to update decode for DataNode striped blocks reconstruction. (yliu) + + HDFS-8319. Erasure Coding: support decoding for stateful read. + (Jing Zhao via zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6102edfe92a..1b5705a8b23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1639,7 +1639,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, /** */ @Override - public synchronized long getPos() throws IOException { + public synchronized long getPos() { return pos; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 2e26cca094a..bf99f17d7fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -30,12 +30,13 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.ByteBufferPool; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.finalizeDecodeInputs; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; @@ -55,6 +56,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.EnumSet; import java.util.Set; import java.util.Collection; @@ -63,8 +65,6 @@ import java.util.HashMap; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.CancellationException; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -113,11 +113,19 @@ public class DFSStripedInputStream extends DFSInputStream { } private final BlockReader[] blockReaders; + /** + * when initializing block readers, their starting offsets are set to the same + * number: the smallest internal block offsets among all the readers. This is + * because it is possible that for some internal blocks we have to read + * "backwards" for decoding purpose. We thus use this offset array to track + * offsets for all the block readers so that we can skip data if necessary. + */ + private final long[] blockReaderOffsets; private final DatanodeInfo[] currentNodes; private final int cellSize; private final short dataBlkNum; private final short parityBlkNum; - private final short groupSize; + private final int groupSize; /** the buffer for a complete stripe */ private ByteBuffer curStripeBuf; private final ECSchema schema; @@ -128,7 +136,8 @@ public class DFSStripedInputStream extends DFSInputStream { * block group */ private StripeRange curStripeRange; - private final CompletionService readingService; + private final CompletionService readingService; + private ReaderRetryPolicy retry; DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECSchema schema, int cellSize) throws IOException { @@ -139,8 +148,9 @@ public class DFSStripedInputStream extends DFSInputStream { this.cellSize = cellSize; dataBlkNum = (short) schema.getNumDataUnits(); parityBlkNum = (short) schema.getNumParityUnits(); - groupSize = dataBlkNum; + groupSize = dataBlkNum + parityBlkNum; blockReaders = new BlockReader[groupSize]; + blockReaderOffsets = new long[groupSize]; currentNodes = new DatanodeInfo[groupSize]; curStripeRange = new StripeRange(0, 0); readingService = @@ -197,20 +207,21 @@ public class DFSStripedInputStream extends DFSInputStream { // The purpose is to get start offset into each block. long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema, cellSize, targetBlockGroup, offsetIntoBlockGroup); - Preconditions.checkNotNull(offsetsForInternalBlocks); + Preconditions.checkState( + offsetsForInternalBlocks.length == dataBlkNum + parityBlkNum); + long minOffset = offsetsForInternalBlocks[dataBlkNum]; - final ReaderRetryPolicy retry = new ReaderRetryPolicy(); - for (int i = 0; i < groupSize; i++) { + retry = new ReaderRetryPolicy(); + for (int i = 0; i < dataBlkNum; i++) { LocatedBlock targetBlock = targetBlocks[i]; if (targetBlock != null) { - long offsetInBlock = offsetsForInternalBlocks[i] < 0 ? - 0 : offsetsForInternalBlocks[i]; DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); if (retval != null) { currentNodes[i] = retval.info; blockReaders[i] = getBlockReaderWithRetry(targetBlock, - offsetInBlock, targetBlock.getBlockSize() - offsetInBlock, + minOffset, targetBlock.getBlockSize() - minOffset, retval.addr, retval.storageType, retval.info, target, retry); + blockReaderOffsets[i] = minOffset; } } } @@ -260,19 +271,24 @@ public class DFSStripedInputStream extends DFSInputStream { return; } for (int i = 0; i < groupSize; i++) { - if (blockReaders[i] != null) { - try { - blockReaders[i].close(); - } catch (IOException e) { - DFSClient.LOG.error("error closing blockReader", e); - } - blockReaders[i] = null; - } + closeReader(i); currentNodes[i] = null; } blockEnd = -1; } + private void closeReader(int index) { + if (blockReaders[index] != null) { + try { + blockReaders[index].close(); + } catch (IOException e) { + DFSClient.LOG.error("error closing blockReader " + index, e); + } + blockReaders[index] = null; + } + blockReaderOffsets[index] = 0; + } + private long getOffsetInBlockGroup() { return getOffsetInBlockGroup(pos); } @@ -300,54 +316,81 @@ public class DFSStripedInputStream extends DFSInputStream { curStripeRange = new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset); - final int startCell = stripeBufOffset / cellSize; - final int numCell = (stripeLimit - 1) / cellSize + 1; - - // read the whole stripe in parallel - Map, Integer> futures = new HashMap<>(); - for (int i = startCell; i < numCell; i++) { - int bufPos = i == startCell ? stripeBufOffset : cellSize * i; - curStripeBuf.position(bufPos); - curStripeBuf.limit(Math.min(cellSize * (i + 1), stripeLimit)); - ByteBuffer buf = curStripeBuf.slice(); - ByteBufferStrategy strategy = new ByteBufferStrategy(buf); - final int targetLength = buf.remaining(); - Callable readCallable = readCell(blockReaders[i], - currentNodes[i], strategy, targetLength, corruptedBlockMap); - Future request = readingService.submit(readCallable); - futures.put(request, i); - } - while (!futures.isEmpty()) { - try { - waitNextCompletion(readingService, futures); - // TODO: decode and record bad reader if necessary - } catch (InterruptedException ignored) { - // ignore and retry - } + LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; + AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize, + blockGroup, offsetInBlockGroup, + offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf); + // TODO handle null elements in blks (e.g., NN does not know locations for + // all the internal blocks) + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( + blockGroup, cellSize, dataBlkNum, parityBlkNum); + // read the whole stripe + for (AlignedStripe stripe : stripes) { + // Parse group to get chosen DN location + StripeReader sreader = new StatefulStripeReader(readingService, stripe, + blks); + sreader.readStripe(blks, corruptedBlockMap); } + curStripeBuf.position(stripeBufOffset); + curStripeBuf.limit(stripeLimit); } - private Callable readCell(final BlockReader reader, - final DatanodeInfo datanode, final ByteBufferStrategy strategy, + private Callable readCell(final BlockReader reader, + final DatanodeInfo datanode, final long currentReaderOffset, + final long targetReaderOffset, final ByteBufferStrategy strategy, final int targetLength, final Map> corruptedBlockMap) { - return new Callable() { + return new Callable() { @Override - public Integer call() throws Exception { + public Void call() throws Exception { + // reader can be null if getBlockReaderWithRetry failed or + // the reader hit exception before + if (reader == null) { + throw new IOException("The BlockReader is null. " + + "The BlockReader creation failed or the reader hit exception."); + } + Preconditions.checkState(currentReaderOffset <= targetReaderOffset); + if (currentReaderOffset < targetReaderOffset) { + long skipped = reader.skip(targetReaderOffset - currentReaderOffset); + Preconditions.checkState( + skipped == targetReaderOffset - currentReaderOffset); + } int result = 0; while (result < targetLength) { - int ret = readBuffer(reader, datanode, strategy, corruptedBlockMap); + int ret = readToBuffer(reader, datanode, strategy, corruptedBlockMap); if (ret < 0) { throw new IOException("Unexpected EOS from the reader"); } result += ret; } updateReadStatistics(readStatistics, targetLength, reader); - return result; + return null; } }; } + private int readToBuffer(BlockReader blockReader, + DatanodeInfo currentNode, ByteBufferStrategy readerStrategy, + Map> corruptedBlockMap) + throws IOException { + try { + return readerStrategy.doRead(blockReader, 0, 0); + } catch (ChecksumException ce) { + DFSClient.LOG.warn("Found Checksum error for " + + getCurrentBlock() + " from " + currentNode + + " at " + ce.getPos()); + // we want to remember which block replicas we have tried + addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, + corruptedBlockMap); + throw ce; + } catch (IOException e) { + DFSClient.LOG.warn("Exception while reading from " + + getCurrentBlock() + " of " + src + " from " + + currentNode, e); + throw e; + } + } + /** * Seek to a new arbitrary location */ @@ -416,7 +459,7 @@ public class DFSStripedInputStream extends DFSInputStream { if (!curStripeRange.include(getOffsetInBlockGroup())) { readOneStripe(corruptedBlockMap); } - int ret = copy(strategy, off + result, realLen - result); + int ret = copyToTargetBuf(strategy, off + result, realLen - result); result += ret; pos += ret; } @@ -434,26 +477,6 @@ public class DFSStripedInputStream extends DFSInputStream { return -1; } - private int readBuffer(BlockReader blockReader, - DatanodeInfo currentNode, ByteBufferStrategy readerStrategy, - Map> corruptedBlockMap) { - try { - return readerStrategy.doRead(blockReader, 0, 0); - } catch ( ChecksumException ce ) { - DFSClient.LOG.warn("Found Checksum error for " - + getCurrentBlock() + " from " + currentNode - + " at " + ce.getPos()); - // we want to remember which block replicas we have tried - addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, - corruptedBlockMap); - } catch (IOException e) { - DFSClient.LOG.warn("Exception while reading from " - + getCurrentBlock() + " of " + src + " from " - + currentNode, e); - } - return -1; - } - /** * Copy the data from {@link #curStripeBuf} into the given buffer * @param strategy the ReaderStrategy containing the given buffer @@ -462,7 +485,7 @@ public class DFSStripedInputStream extends DFSInputStream { * @param length target length * @return number of bytes copied */ - private int copy(ReaderStrategy strategy, int offset, int length) { + private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) { final long offsetInBlk = getOffsetInBlockGroup(); int bufOffset = getStripedBufOffset(offsetInBlk); curStripeBuf.position(bufOffset); @@ -519,120 +542,19 @@ public class DFSStripedInputStream extends DFSInputStream { AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize, blockGroup, start, end, buf, offset); + CompletionService readService = new ExecutorCompletionService<>( + dfsClient.getStripedReadsThreadPool()); + // TODO handle null elements in blks (e.g., NN does not know locations for + // all the internal blocks) + final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( + blockGroup, cellSize, dataBlkNum, parityBlkNum); for (AlignedStripe stripe : stripes) { - fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap); + // Parse group to get chosen DN location + StripeReader preader = new PositionStripeReader(readService, stripe); + preader.readStripe(blks, corruptedBlockMap); } } - private void fetchOneStripe(LocatedStripedBlock blockGroup, - byte[] buf, AlignedStripe alignedStripe, Map> corruptedBlockMap) throws IOException { - Map, Integer> futures = new HashMap<>(); - CompletionService service = - new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool()); - if (alignedStripe.getSpanInBlock() == 0) { - DFSClient.LOG.warn("Trying to read an empty stripe from" + blockGroup); - return; - } - // Parse group to get chosen DN location - LocatedBlock[] blks = StripedBlockUtil. - parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum); - for (short i = 0; i < dataBlkNum; i++) { - if (alignedStripe.chunks[i] != null - && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { - fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i, - corruptedBlockMap); - } - } - // Input buffers for potential decode operation, which remains null until - // first read failure - byte[][] decodeInputs = null; - while (!futures.isEmpty()) { - try { - StripingChunkReadResult r = getNextCompletedStripedRead( - service, futures, 0); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe); - } - StripingChunk returnedChunk = alignedStripe.chunks[r.index]; - Preconditions.checkNotNull(returnedChunk); - Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING); - if (r.state == StripingChunkReadResult.SUCCESSFUL) { - returnedChunk.state = StripingChunk.FETCHED; - alignedStripe.fetchedChunksNum++; - if (alignedStripe.fetchedChunksNum == dataBlkNum) { - clearFutures(futures.keySet()); - break; - } - } else { - returnedChunk.state = StripingChunk.MISSING; - alignedStripe.missingChunksNum++; - if (alignedStripe.missingChunksNum > parityBlkNum) { - clearFutures(futures.keySet()); - throw new IOException("Too many blocks are missing: " + alignedStripe); - } - // When seeing first missing block, initialize decode input buffers - if (decodeInputs == null) { - decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum); - } - for (int i = 0; i < alignedStripe.chunks.length; i++) { - StripingChunk chunk = alignedStripe.chunks[i]; - Preconditions.checkNotNull(chunk); - if (chunk.state == StripingChunk.REQUESTED && i <= dataBlkNum) { - fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i, - corruptedBlockMap); - } - } - } - } catch (InterruptedException ie) { - String err = "Read request interrupted"; - DFSClient.LOG.error(err); - clearFutures(futures.keySet()); - // Don't decode if read interrupted - throw new InterruptedIOException(err); - } - } - - if (alignedStripe.missingChunksNum > 0) { - finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum, - alignedStripe); - decodeAndFillBuffer(decodeInputs, buf, alignedStripe, dataBlkNum, - parityBlkNum, decoder); - } - } - - /** - * Schedule a single read request to an internal block - * @param block The internal block - * @param index Index of the internal block in the group - * @param corruptedBlockMap Map of corrupted blocks - */ - private void fetchOneStripingChunk(Map, Integer> futures, - final CompletionService service, final LocatedBlock block, - final AlignedStripe alignedStripe, final int index, - Map> corruptedBlockMap) { - DatanodeInfo loc = block.getLocations()[0]; - StorageType type = block.getStorageTypes()[0]; - DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( - loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), - type); - StripingChunk chunk = alignedStripe.chunks[index]; - chunk.state = StripingChunk.PENDING; - Callable readCallable = getFromOneDataNode(dnAddr, - block, alignedStripe.getOffsetInBlock(), - alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, - chunk.buf, chunk.getOffsets(), chunk.getLengths(), - corruptedBlockMap, index); - Future getFromDNRequest = service.submit(readCallable); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Submitting striped read request for " + index + - ". Info of the block: " + block + ", offset in block is " + - alignedStripe.getOffsetInBlock() + ", end is " + - (alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1)); - } - futures.put(getFromDNRequest, index); - } - private Callable getFromOneDataNode(final DNAddrPair datanode, final LocatedBlock block, final long start, final long end, final byte[] buf, final int[] offsets, final int[] lengths, @@ -655,21 +577,302 @@ public class DFSStripedInputStream extends DFSInputStream { }; } - private void waitNextCompletion(CompletionService service, - Map, Integer> futures) throws InterruptedException { - if (futures.isEmpty()) { - throw new InterruptedException("Futures already empty"); + private abstract class StripeReader { + final Map, Integer> futures = new HashMap<>(); + final AlignedStripe alignedStripe; + final CompletionService service; + + StripeReader(CompletionService service, AlignedStripe alignedStripe) { + this.service = service; + this.alignedStripe = alignedStripe; } - Future future = null; - try { - future = service.take(); - future.get(); - futures.remove(future); - } catch (ExecutionException | CancellationException e) { - // already logged in the Callable - futures.remove(future); + + /** submit reading chunk task */ + abstract void readChunk(final CompletionService service, + final LocatedBlock block, int chunkIndex, + Map> corruptedBlockMap); + + /** + * When seeing first missing block, initialize decode input buffers. + * Also prepare the reading for data blocks outside of the reading range. + */ + abstract void prepareDecodeInputs() throws IOException; + + /** + * Prepare reading for one more parity chunk. + */ + abstract void prepareParityChunk() throws IOException; + + abstract void decode(); + + abstract void updateState4SuccessRead(StripingChunkReadResult result); + + /** read the whole stripe. do decoding if necessary */ + void readStripe(LocatedBlock[] blocks, + Map> corruptedBlockMap) + throws IOException { + assert alignedStripe.getSpanInBlock() > 0; + for (short i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] != null + && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + readChunk(service, blocks[i], i, corruptedBlockMap); + } + } + + // Input buffers for potential decode operation, which remains null until + // first read failure + while (!futures.isEmpty()) { + try { + StripingChunkReadResult r = getNextCompletedStripedRead(service, + futures, 0); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + + alignedStripe); + } + StripingChunk returnedChunk = alignedStripe.chunks[r.index]; + Preconditions.checkNotNull(returnedChunk); + Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING); + + if (r.state == StripingChunkReadResult.SUCCESSFUL) { + returnedChunk.state = StripingChunk.FETCHED; + alignedStripe.fetchedChunksNum++; + updateState4SuccessRead(r); + if (alignedStripe.fetchedChunksNum == dataBlkNum) { + clearFutures(futures.keySet()); + break; + } + } else { + returnedChunk.state = StripingChunk.MISSING; + alignedStripe.missingChunksNum++; + if (alignedStripe.missingChunksNum > parityBlkNum) { + clearFutures(futures.keySet()); + throw new IOException("Too many blocks are missing: " + + alignedStripe); + } + + prepareDecodeInputs(); + prepareParityChunk(); + // close the corresponding reader + closeReader(r.index); + + for (int i = 0; i < alignedStripe.chunks.length; i++) { + StripingChunk chunk = alignedStripe.chunks[i]; + if (chunk != null && chunk.state == StripingChunk.REQUESTED) { + readChunk(service, blocks[i], i, corruptedBlockMap); + } + } + } + } catch (InterruptedException ie) { + String err = "Read request interrupted"; + DFSClient.LOG.error(err); + clearFutures(futures.keySet()); + // Don't decode if read interrupted + throw new InterruptedIOException(err); + } + } + + if (alignedStripe.missingChunksNum > 0) { + decode(); + } + } + } + + class PositionStripeReader extends StripeReader { + private byte[][] decodeInputs = null; + + PositionStripeReader(CompletionService service, + AlignedStripe alignedStripe) { + super(service, alignedStripe); + } + + @Override + void readChunk(final CompletionService service, + final LocatedBlock block, int chunkIndex, + Map> corruptedBlockMap) { + DatanodeInfo loc = block.getLocations()[0]; + StorageType type = block.getStorageTypes()[0]; + DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( + loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), + type); + StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + chunk.state = StripingChunk.PENDING; + Callable readCallable = getFromOneDataNode(dnAddr, + block, alignedStripe.getOffsetInBlock(), + alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, + chunk.byteArray.buf(), chunk.byteArray.getOffsets(), + chunk.byteArray.getLengths(), corruptedBlockMap, chunkIndex); + Future getFromDNRequest = service.submit(readCallable); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Submitting striped read request for " + chunkIndex + + ". Info of the block: " + block + ", offset in block is " + + alignedStripe.getOffsetInBlock() + ", end is " + + (alignedStripe.getOffsetInBlock() + + alignedStripe.getSpanInBlock() - 1)); + } + futures.put(getFromDNRequest, chunkIndex); + } + + @Override + void updateState4SuccessRead(StripingChunkReadResult r) {} + + @Override + void prepareDecodeInputs() { + if (decodeInputs == null) { + decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum); + } + } + + @Override + void prepareParityChunk() { + for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { + if (alignedStripe.chunks[i] == null) { + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); + alignedStripe.chunks[i].addByteArraySlice(0, + (int) alignedStripe.getSpanInBlock()); + break; + } + } + } + + @Override + void decode() { + finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum, + alignedStripe); + decodeAndFillBuffer(decodeInputs, alignedStripe, dataBlkNum, + parityBlkNum, decoder); + } + } + + class StatefulStripeReader extends StripeReader { + ByteBuffer[] decodeInputs; + final LocatedBlock[] targetBlocks; + + StatefulStripeReader(CompletionService service, + AlignedStripe alignedStripe, LocatedBlock[] targetBlocks) { + super(service, alignedStripe); + this.targetBlocks = targetBlocks; + } + + @Override + void readChunk(final CompletionService service, + final LocatedBlock block, int chunkIndex, Map> corruptedBlockMap) { + StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + chunk.state = StripingChunk.PENDING; + ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer); + Callable readCallable = readCell(blockReaders[chunkIndex], + currentNodes[chunkIndex], blockReaderOffsets[chunkIndex], + alignedStripe.getOffsetInBlock(), strategy, + chunk.byteBuffer.remaining(), corruptedBlockMap); + Future request = readingService.submit(readCallable); + futures.put(request, chunkIndex); + } + + @Override + void updateState4SuccessRead(StripingChunkReadResult result) { + Preconditions.checkArgument( + result.state == StripingChunkReadResult.SUCCESSFUL); + blockReaderOffsets[result.index] = + alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock(); + } + + @Override + void prepareDecodeInputs() throws IOException { + if (decodeInputs == null) { + decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum]; + ByteBuffer cur = curStripeBuf.duplicate(); + StripedBlockUtil.VerticalRange range = alignedStripe.range; + for (int i = 0; i < dataBlkNum; i++) { + cur.limit(cur.capacity()); + int pos = (int) (range.offsetInBlock % cellSize + cellSize * i); + cur.position(pos); + cur.limit((int) (pos + range.spanInBlock)); + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, + parityBlkNum); + decodeInputs[decodeIndex] = cur.slice(); + if (alignedStripe.chunks[i] == null) { + alignedStripe.chunks[i] = + new StripingChunk(decodeInputs[decodeIndex]); + } + } + } + } + + @Override + void prepareParityChunk() throws IOException { + for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { + if (alignedStripe.chunks[i] == null) { + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, + parityBlkNum); + decodeInputs[decodeIndex] = ByteBuffer.allocateDirect( + (int) alignedStripe.range.spanInBlock); + alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); + if (blockReaders[i] == null) { + prepareParityBlockReader(i); + } + break; + } + } + } + + private void prepareParityBlockReader(int i) throws IOException { + // prepare the block reader for the parity chunk + LocatedBlock targetBlock = targetBlocks[i]; + if (targetBlock != null) { + final long offsetInBlock = alignedStripe.getOffsetInBlock(); + DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null); + if (retval != null) { + currentNodes[i] = retval.info; + blockReaders[i] = getBlockReaderWithRetry(targetBlock, + offsetInBlock, targetBlock.getBlockSize() - offsetInBlock, + retval.addr, retval.storageType, retval.info, + DFSStripedInputStream.this.getPos(), retry); + blockReaderOffsets[i] = offsetInBlock; + } + } + } + + @Override + void decode() { + // TODO no copy for data chunks. this depends on HADOOP-12047 for some + // decoders to work + final int span = (int) alignedStripe.getSpanInBlock(); + for (int i = 0; i < alignedStripe.chunks.length; i++) { + final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { + for (int j = 0; j < span; j++) { + decodeInputs[decodeIndex].put((byte) 0); + } + decodeInputs[decodeIndex].flip(); + } else if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.FETCHED) { + decodeInputs[decodeIndex].position(0); + decodeInputs[decodeIndex].limit(span); + } + } + int[] decodeIndices = new int[parityBlkNum]; + int pos = 0; + for (int i = 0; i < alignedStripe.chunks.length; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.MISSING) { + decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); + } + } + decodeIndices = Arrays.copyOf(decodeIndices, pos); + + final int decodeChunkNum = decodeIndices.length; + ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum]; + for (int i = 0; i < decodeChunkNum; i++) { + outputs[i] = decodeInputs[decodeIndices[i]]; + outputs[i].position(0); + outputs[i].limit((int) alignedStripe.range.spanInBlock); + decodeInputs[decodeIndices[i]] = null; + } + + decoder.decode(decodeInputs, decodeIndices, outputs); } - throw new InterruptedException("let's retry"); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 1db2045741f..a29e8e35185 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -33,6 +33,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import java.nio.ByteBuffer; import java.util.*; import java.io.IOException; import java.util.concurrent.CancellationException; @@ -79,7 +80,6 @@ public class StripedBlockUtil { public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, int cellSize, int dataBlkNum, int parityBlkNum) { int locatedBGSize = bg.getBlockIndices().length; - // TODO not considering missing blocks for now, only identify data blocks LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum]; for (short i = 0; i < locatedBGSize; i++) { final int idx = bg.getBlockIndices()[i]; @@ -212,7 +212,7 @@ public class StripedBlockUtil { return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT); } } catch (ExecutionException e) { - DFSClient.LOG.error("ExecutionException " + e); + DFSClient.LOG.warn("ExecutionException " + e); return new StripingChunkReadResult(futures.remove(future), StripingChunkReadResult.FAILED); } catch (CancellationException e) { @@ -253,12 +253,13 @@ public class StripedBlockUtil { int dataBlkNum, int parityBlkNum) { byte[][] decodeInputs = new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()]; - for (int i = 0; i < alignedStripe.chunks.length; i++) { + // read the full data aligned stripe + for (int i = 0; i < dataBlkNum; i++) { if (alignedStripe.chunks[i] == null) { final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]); - alignedStripe.chunks[i].offsetsInBuf.add(0); - alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock()); + alignedStripe.chunks[i].addByteArraySlice(0, + (int) alignedStripe.getSpanInBlock()); } } return decodeInputs; @@ -276,14 +277,9 @@ public class StripedBlockUtil { for (int i = 0; i < alignedStripe.chunks.length; i++) { final StripingChunk chunk = alignedStripe.chunks[i]; final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum); - if (chunk.state == StripingChunk.FETCHED) { - int posInBuf = 0; - for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { - System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j), - decodeInputs[decodeIndex], posInBuf, chunk.lengthsInBuf.get(j)); - posInBuf += chunk.lengthsInBuf.get(j); - } - } else if (chunk.state == StripingChunk.ALLZERO) { + if (chunk != null && chunk.state == StripingChunk.FETCHED) { + chunk.copyTo(decodeInputs[decodeIndex]); + } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) { Arrays.fill(decodeInputs[decodeIndex], (byte) 0); } else { decodeInputs[decodeIndex] = null; @@ -315,13 +311,14 @@ public class StripedBlockUtil { * Decode based on the given input buffers and schema. */ public static void decodeAndFillBuffer(final byte[][] decodeInputs, - byte[] buf, AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum, + AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum, RawErasureDecoder decoder) { // Step 1: prepare indices and output buffers for missing data units int[] decodeIndices = new int[parityBlkNum]; int pos = 0; for (int i = 0; i < alignedStripe.chunks.length; i++) { - if (alignedStripe.chunks[i].state == StripingChunk.MISSING){ + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state == StripingChunk.MISSING){ decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum); } } @@ -338,16 +335,58 @@ public class StripedBlockUtil { dataBlkNum, parityBlkNum); StripingChunk chunk = alignedStripe.chunks[missingBlkIdx]; if (chunk.state == StripingChunk.MISSING) { - int srcPos = 0; - for (int j = 0; j < chunk.offsetsInBuf.size(); j++) { - System.arraycopy(decodeOutputs[i], srcPos, buf, - chunk.offsetsInBuf.get(j), chunk.lengthsInBuf.get(j)); - srcPos += chunk.lengthsInBuf.get(j); - } + chunk.copyFrom(decodeOutputs[i]); } } } + /** + * Similar functionality with {@link #divideByteRangeIntoStripes}, but is used + * by stateful read and uses ByteBuffer as reading target buffer. Besides the + * read range is within a single stripe thus the calculation logic is simpler. + */ + public static AlignedStripe[] divideOneStripe(ECSchema ecSchema, + int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup, + long rangeEndInBlockGroup, ByteBuffer buf) { + final int dataBlkNum = ecSchema.getNumDataUnits(); + // Step 1: map the byte range to StripingCells + StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize, + blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup); + + // Step 2: get the unmerged ranges on each internal block + VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cellSize, + cells); + + // Step 3: merge into stripes + AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges); + + // Step 4: calculate each chunk's position in destination buffer. Since the + // whole read range is within a single stripe, the logic is simpler here. + int bufOffset = (int) (rangeStartInBlockGroup % (cellSize * dataBlkNum)); + for (StripingCell cell : cells) { + long cellStart = cell.idxInInternalBlk * cellSize + cell.offset; + long cellEnd = cellStart + cell.size - 1; + for (AlignedStripe s : stripes) { + long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1; + long overlapStart = Math.max(cellStart, s.getOffsetInBlock()); + long overlapEnd = Math.min(cellEnd, stripeEnd); + int overLapLen = (int) (overlapEnd - overlapStart + 1); + if (overLapLen > 0) { + Preconditions.checkState(s.chunks[cell.idxInStripe] == null); + final int pos = (int) (bufOffset + overlapStart - cellStart); + buf.position(pos); + buf.limit(pos + overLapLen); + s.chunks[cell.idxInStripe] = new StripingChunk(buf.slice()); + } + } + bufOffset += cell.size; + } + + // Step 5: prepare ALLZERO blocks + prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum); + return stripes; + } + /** * This method divides a requested byte range into an array of inclusive * {@link AlignedStripe}. @@ -369,7 +408,7 @@ public class StripedBlockUtil { int offsetInBuf) { // Step 0: analyze range and calculate basic parameters - int dataBlkNum = ecSchema.getNumDataUnits(); + final int dataBlkNum = ecSchema.getNumDataUnits(); // Step 1: map the byte range to StripingCells StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize, @@ -386,7 +425,7 @@ public class StripedBlockUtil { calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf); // Step 5: prepare ALLZERO blocks - prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum); + prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum); return stripes; } @@ -403,23 +442,25 @@ public class StripedBlockUtil { Preconditions.checkArgument( rangeStartInBlockGroup <= rangeEndInBlockGroup && rangeEndInBlockGroup < blockGroup.getBlockSize()); - int len = (int) (rangeEndInBlockGroup - rangeStartInBlockGroup + 1); + long len = rangeEndInBlockGroup - rangeStartInBlockGroup + 1; int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize); int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize); int numCells = lastCellIdxInBG - firstCellIdxInBG + 1; StripingCell[] cells = new StripingCell[numCells]; - cells[0] = new StripingCell(ecSchema, cellSize, firstCellIdxInBG); - cells[numCells - 1] = new StripingCell(ecSchema, cellSize, lastCellIdxInBG); - cells[0].offset = (int) (rangeStartInBlockGroup % cellSize); - cells[0].size = - Math.min(cellSize - (int) (rangeStartInBlockGroup % cellSize), len); + final int firstCellOffset = (int) (rangeStartInBlockGroup % cellSize); + final int firstCellSize = + (int) Math.min(cellSize - (rangeStartInBlockGroup % cellSize), len); + cells[0] = new StripingCell(ecSchema, firstCellSize, firstCellIdxInBG, + firstCellOffset); if (lastCellIdxInBG != firstCellIdxInBG) { - cells[numCells - 1].size = (int) (rangeEndInBlockGroup % cellSize) + 1; + final int lastCellSize = (int) (rangeEndInBlockGroup % cellSize) + 1; + cells[numCells - 1] = new StripingCell(ecSchema, lastCellSize, + lastCellIdxInBG, 0); } for (int i = 1; i < numCells - 1; i++) { - cells[i] = new StripingCell(ecSchema, cellSize, i + firstCellIdxInBG); + cells[i] = new StripingCell(ecSchema, cellSize, i + firstCellIdxInBG, 0); } return cells; @@ -438,8 +479,8 @@ public class StripedBlockUtil { long[] startOffsets = new long[dataBlkNum + parityBlkNum]; Arrays.fill(startOffsets, -1L); int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize); - StripingCell firstCell = new StripingCell(ecSchema, cellSize, firstCellIdxInBG); - firstCell.offset = (int) (rangeStartInBlockGroup % cellSize); + StripingCell firstCell = new StripingCell(ecSchema, cellSize, + firstCellIdxInBG, (int) (rangeStartInBlockGroup % cellSize)); startOffsets[firstCell.idxInStripe] = firstCell.idxInInternalBlk * cellSize + firstCell.offset; long earliestStart = startOffsets[firstCell.idxInStripe]; @@ -448,7 +489,7 @@ public class StripedBlockUtil { if (idx * (long) cellSize >= blockGroup.getBlockSize()) { break; } - StripingCell cell = new StripingCell(ecSchema, cellSize, idx); + StripingCell cell = new StripingCell(ecSchema, cellSize, idx, 0); startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * (long) cellSize; if (startOffsets[cell.idxInStripe] < earliestStart) { earliestStart = startOffsets[cell.idxInStripe]; @@ -563,10 +604,8 @@ public class StripedBlockUtil { if (s.chunks[cell.idxInStripe] == null) { s.chunks[cell.idxInStripe] = new StripingChunk(buf); } - - s.chunks[cell.idxInStripe].offsetsInBuf. - add((int)(offsetInBuf + done + overlapStart - cellStart)); - s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen); + s.chunks[cell.idxInStripe].addByteArraySlice( + (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen); } done += cell.size; } @@ -577,15 +616,14 @@ public class StripedBlockUtil { * size, the chunk should be treated as zero bytes in decoding. */ private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup, - byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) { + AlignedStripe[] stripes, int cellSize, int dataBlkNum) { for (AlignedStripe s : stripes) { for (int i = 0; i < dataBlkNum; i++) { long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(), cellSize, dataBlkNum, i); if (internalBlkLen <= s.getOffsetInBlock()) { Preconditions.checkState(s.chunks[i] == null); - s.chunks[i] = new StripingChunk(buf); - s.chunks[i].state = StripingChunk.ALLZERO; + s.chunks[i] = new StripingChunk(); // chunk state is set to ALLZERO } } } @@ -615,7 +653,7 @@ public class StripedBlockUtil { */ @VisibleForTesting static class StripingCell { - public final ECSchema schema; + final ECSchema schema; /** Logical order in a block group, used when doing I/O to a block group */ final int idxInBlkGroup; final int idxInInternalBlk; @@ -626,27 +664,17 @@ public class StripedBlockUtil { * {@link #size} variable represent the start offset and size of the * overlap. */ - int offset; - int size; + final int offset; + final int size; - StripingCell(ECSchema ecSchema, int cellSize, int idxInBlkGroup) { + StripingCell(ECSchema ecSchema, int cellSize, int idxInBlkGroup, + int offset) { this.schema = ecSchema; this.idxInBlkGroup = idxInBlkGroup; this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits(); this.idxInStripe = idxInBlkGroup - this.idxInInternalBlk * ecSchema.getNumDataUnits(); - this.offset = 0; - this.size = cellSize; - } - - StripingCell(ECSchema ecSchema, int cellSize, int idxInInternalBlk, - int idxInStripe) { - this.schema = ecSchema; - this.idxInInternalBlk = idxInInternalBlk; - this.idxInStripe = idxInStripe; - this.idxInBlkGroup = - idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe; - this.offset = 0; + this.offset = offset; this.size = cellSize; } } @@ -700,11 +728,6 @@ public class StripedBlockUtil { this.chunks = new StripingChunk[width]; } - public AlignedStripe(VerticalRange range, int width) { - this.range = range; - this.chunks = new StripingChunk[width]; - } - public boolean include(long pos) { return range.include(pos); } @@ -777,10 +800,6 @@ public class StripedBlockUtil { * |REQUESTED| |REQUESTED| ALLZERO | |null| |null| <- AlignedStripe2 * +---------+ +---------+ | +----+ +----+ * <----------- data blocks ------------> | <--- parity ---> - * - * The class also carries {@link #buf}, {@link #offsetsInBuf}, and - * {@link #lengthsInBuf} to define how read task for this chunk should - * deliver the returned data. */ public static class StripingChunk { /** Chunk has been successfully fetched */ @@ -808,11 +827,49 @@ public class StripedBlockUtil { * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ... */ public int state = REQUESTED; - public byte[] buf; - public List offsetsInBuf; - public List lengthsInBuf; + + public final ChunkByteArray byteArray; + public final ByteBuffer byteBuffer; public StripingChunk(byte[] buf) { + this.byteArray = new ChunkByteArray(buf); + byteBuffer = null; + } + + public StripingChunk(ByteBuffer buf) { + this.byteArray = null; + this.byteBuffer = buf; + } + + public StripingChunk() { + this.byteArray = null; + this.byteBuffer = null; + this.state = ALLZERO; + } + + public void addByteArraySlice(int offset, int length) { + assert byteArray != null; + byteArray.offsetsInBuf.add(offset); + byteArray.lengthsInBuf.add(length); + } + + void copyTo(byte[] target) { + assert byteArray != null; + byteArray.copyTo(target); + } + + void copyFrom(byte[] src) { + assert byteArray != null; + byteArray.copyFrom(src); + } + } + + public static class ChunkByteArray { + private final byte[] buf; + private final List offsetsInBuf; + private final List lengthsInBuf; + + ChunkByteArray(byte[] buf) { this.buf = buf; this.offsetsInBuf = new ArrayList<>(); this.lengthsInBuf = new ArrayList<>(); @@ -833,6 +890,28 @@ public class StripedBlockUtil { } return lens; } + + public byte[] buf() { + return buf; + } + + void copyTo(byte[] target) { + int posInBuf = 0; + for (int i = 0; i < offsetsInBuf.size(); i++) { + System.arraycopy(buf, offsetsInBuf.get(i), + target, posInBuf, lengthsInBuf.get(i)); + posInBuf += lengthsInBuf.get(i); + } + } + + void copyFrom(byte[] src) { + int srcPos = 0; + for (int j = 0; j < offsetsInBuf.size(); j++) { + System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j), + lengthsInBuf.get(j)); + srcPos += lengthsInBuf.get(j); + } + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 54367d743af..23697040db7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -32,7 +32,7 @@ public class StripedFileTestUtil { static final int blockSize = cellSize * stripesPerBlock; static final int numDNs = dataBlocks + parityBlocks + 2; - static final Random r = new Random(); + static final Random random = new Random(); static byte[] generateBytes(int cnt) { byte[] bytes = new byte[cnt]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index a28f88ef8bb..0201d071ab2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -29,6 +29,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; @@ -58,28 +59,28 @@ public class TestReadStripedFileWithDecoding { } @Test - public void testWritePreadWithDNFailure1() throws IOException { - testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0); + public void testReadWithDNFailure1() throws IOException { + testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0); } @Test - public void testWritePreadWithDNFailure2() throws IOException { - testWritePreadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5); + public void testReadWithDNFailure2() throws IOException { + testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5); } @Test - public void testWritePreadWithDNFailure3() throws IOException { - testWritePreadWithDNFailure("/foo", cellSize * dataBlocks, 0); + public void testReadWithDNFailure3() throws IOException { + testReadWithDNFailure("/foo", cellSize * dataBlocks, 0); } - private void testWritePreadWithDNFailure(String file, int fileSize, int startOffsetInFile) - throws IOException { + private void testReadWithDNFailure(String file, int fileSize, + int startOffsetInFile) throws IOException { final int failedDNIdx = 2; Path testPath = new Path(file); final byte[] bytes = StripedFileTestUtil.generateBytes(fileSize); DFSTestUtil.writeFile(fs, testPath, bytes); - // shut down the DN that holds the last internal data block + // shut down the DN that holds an internal data block BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5, cellSize); String name = (locs[0].getNames())[failedDNIdx]; @@ -99,14 +100,30 @@ public class TestReadStripedFileWithDecoding { fileSize - startOffsetInFile, readLen); byte[] expected = new byte[readLen]; - for (int i = startOffsetInFile; i < fileSize; i++) { - expected[i - startOffsetInFile] = StripedFileTestUtil.getByte(i); - } + System.arraycopy(bytes, startOffsetInFile, expected, 0, + fileSize - startOffsetInFile); for (int i = startOffsetInFile; i < fileSize; i++) { Assert.assertEquals("Byte at " + i + " should be the same", expected[i - startOffsetInFile], buf[i - startOffsetInFile]); } } + + // stateful read + ByteBuffer result = ByteBuffer.allocate(fileSize); + ByteBuffer buf = ByteBuffer.allocate(1024); + int readLen = 0; + int ret; + try (FSDataInputStream in = fs.open(testPath)) { + while ((ret = in.read(buf)) >= 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } + Assert.assertEquals("The length of file should be the same to write size", + fileSize, readLen); + Assert.assertArrayEquals(bytes, result.array()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index e2e52467a62..272650d358c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -17,17 +17,21 @@ */ package org.apache.hadoop.hdfs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.web.ByteRangeInputStream; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.EOFException; @@ -41,12 +45,13 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock; public class TestWriteReadStripedFile { + public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class); private static MiniDFSCluster cluster; private static FileSystem fs; private static Configuration conf; - @BeforeClass - public static void setup() throws IOException { + @Before + public void setup() throws IOException { conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); @@ -55,8 +60,8 @@ public class TestWriteReadStripedFile { fs = cluster.getFileSystem(); } - @AfterClass - public static void tearDown() throws IOException { + @After + public void tearDown() throws IOException { if (cluster != null) { cluster.shutdown(); } @@ -65,75 +70,98 @@ public class TestWriteReadStripedFile { @Test public void testFileEmpty() throws IOException { testOneFileUsingDFSStripedInputStream("/EmptyFile", 0); + testOneFileUsingDFSStripedInputStream("/EmptyFile2", 0, true); } @Test public void testFileSmallerThanOneCell1() throws IOException { testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", 1, true); } @Test public void testFileSmallerThanOneCell2() throws IOException { testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", cellSize - 1, + true); } @Test public void testFileEqualsWithOneCell() throws IOException { testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize); + testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell2", cellSize, true); } @Test public void testFileSmallerThanOneStripe1() throws IOException { testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize * dataBlocks - 1); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2", + cellSize * dataBlocks - 1, true); } @Test public void testFileSmallerThanOneStripe2() throws IOException { testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", cellSize + 123); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2", + cellSize + 123, true); } @Test public void testFileEqualsWithOneStripe() throws IOException { testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", cellSize * dataBlocks); + testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe2", + cellSize * dataBlocks, true); } @Test public void testFileMoreThanOneStripe1() throws IOException { testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", cellSize * dataBlocks + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe12", + cellSize * dataBlocks + 123, true); } @Test public void testFileMoreThanOneStripe2() throws IOException { testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", cellSize * dataBlocks + cellSize * dataBlocks + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe22", + cellSize * dataBlocks + cellSize * dataBlocks + 123, true); } @Test public void testLessThanFullBlockGroup() throws IOException { testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup", cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize); + testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup2", + cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize, true); } @Test public void testFileFullBlockGroup() throws IOException { testOneFileUsingDFSStripedInputStream("/FullBlockGroup", blockSize * dataBlocks); + testOneFileUsingDFSStripedInputStream("/FullBlockGroup2", + blockSize * dataBlocks, true); } @Test public void testFileMoreThanABlockGroup1() throws IOException { testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", blockSize * dataBlocks + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup12", + blockSize * dataBlocks + 123, true); } @Test public void testFileMoreThanABlockGroup2() throws IOException { testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", blockSize * dataBlocks + cellSize + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup22", + blockSize * dataBlocks + cellSize + 123, true); } @@ -142,6 +170,9 @@ public class TestWriteReadStripedFile { testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3", blockSize * dataBlocks * 3 + cellSize * dataBlocks + cellSize + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup32", + blockSize * dataBlocks * 3 + cellSize * dataBlocks + + cellSize + 123, true); } private void assertSeekAndRead(FSDataInputStream fsdis, int pos, @@ -158,12 +189,23 @@ public class TestWriteReadStripedFile { private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) throws IOException { + testOneFileUsingDFSStripedInputStream(src, fileLength, false); + } + + private void testOneFileUsingDFSStripedInputStream(String src, int fileLength, + boolean withDataNodeFailure) throws IOException { final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); Path srcPath = new Path(src); DFSTestUtil.writeFile(fs, srcPath, new String(expected)); verifyLength(fs, srcPath, fileLength); + if (withDataNodeFailure) { + int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks); + LOG.info("stop DataNode " + dnIndex); + stopDataNode(srcPath, dnIndex); + } + byte[] smallBuf = new byte[1024]; byte[] largeBuf = new byte[fileLength + 100]; verifyPread(fs, srcPath, fileLength, expected, largeBuf); @@ -177,6 +219,21 @@ public class TestWriteReadStripedFile { ByteBuffer.allocate(1024)); } + private void stopDataNode(Path path, int failedDNIdx) + throws IOException { + BlockLocation[] locs = fs.getFileBlockLocations(path, 0, cellSize); + if (locs != null && locs.length > 0) { + String name = (locs[0].getNames())[failedDNIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + break; + } + } + } + } + @Test public void testWriteReadUsingWebHdfs() throws Exception { int fileLength = blockSize * dataBlocks + cellSize + 123; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java index 5a1c3fc0965..5d85073994f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java @@ -152,7 +152,7 @@ public class TestStripedBlockUtil { int done = 0; while (done < bgSize) { Preconditions.checkState(done % CELLSIZE == 0); - StripingCell cell = new StripingCell(SCEHMA, CELLSIZE, done / CELLSIZE); + StripingCell cell = new StripingCell(SCEHMA, CELLSIZE, done / CELLSIZE, 0); int idxInStripe = cell.idxInStripe; int size = Math.min(CELLSIZE, bgSize - done); for (int i = 0; i < size; i++) { @@ -176,8 +176,7 @@ public class TestStripedBlockUtil { assertFalse(blocks[i].isStriped()); assertEquals(i, BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock())); - assertEquals(i * CELLSIZE, blocks[i].getStartOffset()); - /** TODO: properly define {@link LocatedBlock#offset} for internal blocks */ + assertEquals(0, blocks[i].getStartOffset()); assertEquals(1, blocks[i].getLocations().length); assertEquals(i, blocks[i].getLocations()[0].getIpcPort()); assertEquals(i, blocks[i].getLocations()[0].getXferPort()); @@ -256,11 +255,12 @@ public class TestStripedBlockUtil { continue; } int done = 0; - for (int j = 0; j < chunk.getLengths().length; j++) { + for (int j = 0; j < chunk.byteArray.getLengths().length; j++) { System.arraycopy(internalBlkBufs[i], (int) stripe.getOffsetInBlock() + done, assembled, - chunk.getOffsets()[j], chunk.getLengths()[j]); - done += chunk.getLengths()[j]; + chunk.byteArray.getOffsets()[j], + chunk.byteArray.getLengths()[j]); + done += chunk.byteArray.getLengths()[j]; } } }