diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 47093886628..10a8cdea433 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -364,3 +364,6 @@ to be consistent with trunk. (zhz) HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock.(waltersu4549) + + HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread. + (jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 5b10ffe45ad..6c3f0ee35c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -44,7 +44,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; @@ -1139,18 +1138,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, }; } - /** - * Used when reading contiguous blocks - */ - private void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long start, final long end, byte[] buf, - int offset, Map> corruptedBlockMap) - throws IOException { - final int length = (int) (end - start + 1); - actualGetFromOneDataNode(datanode, block, start, end, buf, - new int[]{offset}, new int[]{length}, corruptedBlockMap); - } - /** * Read data from one DataNode. * @param datanode the datanode from which to read data @@ -1158,23 +1145,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, * @param startInBlk the startInBlk offset of the block * @param endInBlk the endInBlk offset of the block * @param buf the given byte array into which the data is read - * @param offsets the data may be read into multiple segments of the buf - * (when reading a striped block). this array indicates the - * offset of each buf segment. - * @param lengths the length of each buf segment + * @param offset the offset in buf * @param corruptedBlockMap map recording list of datanodes with corrupted * block replica */ - void actualGetFromOneDataNode(final DNAddrPair datanode, - LocatedBlock block, final long startInBlk, final long endInBlk, - byte[] buf, int[] offsets, int[] lengths, + void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, + final long startInBlk, final long endInBlk, byte[] buf, int offset, Map> corruptedBlockMap) throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once final int len = (int) (endInBlk - startInBlk + 1); - checkReadPortions(offsets, lengths, len); while (true) { // cached block locations may have been updated by chooseDataNode() @@ -1186,13 +1168,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, DFSClientFaultInjector.get().fetchFromDatanodeException(); reader = getBlockReader(block, startInBlk, len, datanode.addr, datanode.storageType, datanode.info); - for (int i = 0; i < offsets.length; i++) { - int nread = reader.readAll(buf, offsets[i], lengths[i]); - updateReadStatistics(readStatistics, nread, reader); - if (nread != lengths[i]) { - throw new IOException("truncated return from reader.read(): " + - "excpected " + lengths[i] + ", got " + nread); - } + int nread = reader.readAll(buf, offset, len); + updateReadStatistics(readStatistics, nread, reader); + if (nread != len) { + throw new IOException("truncated return from reader.read(): " + + "excpected " + len + ", got " + nread); } DFSClientFaultInjector.get().readFromDatanodeDelay(); return; @@ -1247,24 +1227,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, return getBlockAt(block.getStartOffset()); } - /** - * This method verifies that the read portions are valid and do not overlap - * with each other. - */ - private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) { - Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0); - int sum = 0; - for (int i = 0; i < lengths.length; i++) { - if (i > 0) { - int gap = offsets[i] - offsets[i - 1]; - // make sure read portions do not overlap with each other - Preconditions.checkArgument(gap >= lengths[i - 1]); - } - sum += lengths[i]; - } - Preconditions.checkArgument(sum == totalLen); - } - /** * Like {@link #fetchBlockByteRange}except we start up a second, parallel, * 'hedged' read if the first read is taking longer than configured amount of diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 75090036f55..eecdf67eecd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -31,14 +31,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.ByteBufferPool; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.finalizeDecodeInputs; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; @@ -48,10 +40,6 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; -import org.apache.hadoop.net.NetUtils; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; import java.io.EOFException; import java.io.IOException; @@ -166,7 +154,6 @@ public class DFSStripedInputStream extends DFSInputStream { */ private StripeRange curStripeRange; private final CompletionService readingService; - private ReaderRetryPolicy retry; DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECSchema schema, int cellSize, @@ -198,18 +185,6 @@ public class DFSStripedInputStream extends DFSInputStream { curStripeRange = new StripeRange(0, 0); } - @Override - public synchronized int read(final ByteBuffer buf) throws IOException { - ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); - TraceScope scope = - dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src); - try { - return readWithStrategy(byteBufferReader, 0, buf.remaining()); - } finally { - scope.close(); - } - } - /** * When seeking into a new block group, create blockReader for each internal * block in the group. @@ -229,33 +204,6 @@ public class DFSStripedInputStream extends DFSInputStream { this.blockEnd = targetBlockGroup.getStartOffset() + targetBlockGroup.getBlockSize() - 1; currentLocatedBlock = targetBlockGroup; - - final long offsetIntoBlockGroup = getOffsetInBlockGroup(); - LocatedBlock[] targetBlocks = parseStripedBlockGroup( - targetBlockGroup, cellSize, dataBlkNum, parityBlkNum); - // The purpose is to get start offset into each block. - long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema, - cellSize, targetBlockGroup, offsetIntoBlockGroup); - Preconditions.checkState(offsetsForInternalBlocks.length == - dataBlkNum + parityBlkNum); - long minOffset = offsetsForInternalBlocks[dataBlkNum]; - - retry = new ReaderRetryPolicy(); - for (int i = 0; i < dataBlkNum; i++) { - LocatedBlock targetBlock = targetBlocks[i]; - if (targetBlock != null) { - DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null); - if (dnInfo != null) { - BlockReader reader = getBlockReaderWithRetry(targetBlock, - minOffset, targetBlock.getBlockSize() - minOffset, - dnInfo.addr, dnInfo.storageType, dnInfo.info, target, retry); - if (reader != null) { - blockReaders[i] = new BlockReaderInfo(reader, targetBlock, - dnInfo.info, minOffset); - } - } - } - } } /** @@ -308,16 +256,16 @@ public class DFSStripedInputStream extends DFSInputStream { return; } for (int i = 0; i < groupSize; i++) { - closeReader(i); + closeReader(blockReaders[i]); blockReaders[i] = null; } blockEnd = -1; } - private void closeReader(int index) { - if (blockReaders[index] != null) { - IOUtils.cleanup(DFSClient.LOG, blockReaders[index].reader); - blockReaders[index].skip(); + private void closeReader(BlockReaderInfo readerInfo) { + if (readerInfo != null) { + IOUtils.cleanup(DFSClient.LOG, readerInfo.reader); + readerInfo.skip(); } } @@ -358,17 +306,17 @@ public class DFSStripedInputStream extends DFSInputStream { for (AlignedStripe stripe : stripes) { // Parse group to get chosen DN location StripeReader sreader = new StatefulStripeReader(readingService, stripe, - blks, corruptedBlockMap); + blks, blockReaders, corruptedBlockMap); sreader.readStripe(); } curStripeBuf.position(stripeBufOffset); curStripeBuf.limit(stripeLimit); } - private Callable readCell(final BlockReader reader, + private Callable readCells(final BlockReader reader, final DatanodeInfo datanode, final long currentReaderOffset, - final long targetReaderOffset, final ByteBufferStrategy strategy, - final int targetLength, final ExtendedBlock currentBlock, + final long targetReaderOffset, final ByteBufferStrategy[] strategies, + final ExtendedBlock currentBlock, final Map> corruptedBlockMap) { return new Callable() { @Override @@ -386,27 +334,31 @@ public class DFSStripedInputStream extends DFSInputStream { skipped == targetReaderOffset - currentReaderOffset); } int result = 0; - while (result < targetLength) { - int ret = readToBuffer(reader, datanode, strategy, currentBlock, + for (ByteBufferStrategy strategy : strategies) { + result += readToBuffer(reader, datanode, strategy, currentBlock, corruptedBlockMap); - if (ret < 0) { - throw new IOException("Unexpected EOS from the reader"); - } - result += ret; } - updateReadStatistics(readStatistics, targetLength, reader); return null; } }; } private int readToBuffer(BlockReader blockReader, - DatanodeInfo currentNode, ByteBufferStrategy readerStrategy, + DatanodeInfo currentNode, ByteBufferStrategy strategy, ExtendedBlock currentBlock, Map> corruptedBlockMap) throws IOException { + final int targetLength = strategy.buf.remaining(); + int length = 0; try { - return readerStrategy.doRead(blockReader, 0, 0); + while (length < targetLength) { + int ret = strategy.doRead(blockReader, 0, 0); + if (ret < 0) { + throw new IOException("Unexpected EOS from the reader"); + } + length += ret; + } + return length; } catch (ChecksumException ce) { DFSClient.LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode @@ -572,61 +524,49 @@ public class DFSStripedInputStream extends DFSInputStream { // Refresh the striped block group LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset()); - AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, cellSize, - blockGroup, start, end, buf, offset); + AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes( + schema, cellSize, blockGroup, start, end, buf, offset); CompletionService readService = new ExecutorCompletionService<>( dfsClient.getStripedReadsThreadPool()); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); - for (AlignedStripe stripe : stripes) { - // Parse group to get chosen DN location - StripeReader preader = new PositionStripeReader(readService, stripe, - blks, corruptedBlockMap); - preader.readStripe(); + final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize]; + try { + for (AlignedStripe stripe : stripes) { + // Parse group to get chosen DN location + StripeReader preader = new PositionStripeReader(readService, stripe, + blks, preaderInfos, corruptedBlockMap); + preader.readStripe(); + } + } finally { + for (BlockReaderInfo preaderInfo : preaderInfos) { + closeReader(preaderInfo); + } } } - private Callable getFromOneDataNode(final DNAddrPair datanode, - final LocatedBlock block, final long start, final long end, - final byte[] buf, final int[] offsets, final int[] lengths, - final Map> corruptedBlockMap, - final int hedgedReadId) { - final Span parentSpan = Trace.currentSpan(); - return new Callable() { - @Override - public Void call() throws Exception { - TraceScope scope = - Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan); - try { - actualGetFromOneDataNode(datanode, block, start, - end, buf, offsets, lengths, corruptedBlockMap); - } finally { - scope.close(); - } - return null; - } - }; - } - + /** + * The reader for reading a complete {@link AlignedStripe}. Note that an + * {@link AlignedStripe} may cross multiple stripes with cellSize width. + */ private abstract class StripeReader { final Map, Integer> futures = new HashMap<>(); final AlignedStripe alignedStripe; final CompletionService service; final LocatedBlock[] targetBlocks; final Map> corruptedBlockMap; + final BlockReaderInfo[] readerInfos; StripeReader(CompletionService service, AlignedStripe alignedStripe, - LocatedBlock[] targetBlocks, + LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos, Map> corruptedBlockMap) { this.service = service; this.alignedStripe = alignedStripe; this.targetBlocks = targetBlocks; + this.readerInfos = readerInfos; this.corruptedBlockMap = corruptedBlockMap; } - abstract boolean readChunk(final CompletionService service, - final LocatedBlock block, int chunkIndex); - /** prepare all the data chunks */ abstract void prepareDecodeInputs(); @@ -635,7 +575,12 @@ public class DFSStripedInputStream extends DFSInputStream { abstract void decode(); - abstract void updateState4SuccessRead(StripingChunkReadResult result); + void updateState4SuccessRead(StripingChunkReadResult result) { + Preconditions.checkArgument( + result.state == StripingChunkReadResult.SUCCESSFUL); + readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock() + + alignedStripe.getSpanInBlock()); + } private void checkMissingBlocks() throws IOException { if (alignedStripe.missingChunksNum > parityBlkNum) { @@ -654,7 +599,7 @@ public class DFSStripedInputStream extends DFSInputStream { for (int i = 0; i < dataBlkNum; i++) { Preconditions.checkNotNull(alignedStripe.chunks[i]); if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) { - if (!readChunk(service, targetBlocks[i], i)) { + if (!readChunk(targetBlocks[i], i)) { alignedStripe.missingChunksNum++; } } @@ -666,7 +611,7 @@ public class DFSStripedInputStream extends DFSInputStream { for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num; i++) { if (alignedStripe.chunks[i] == null) { - if (prepareParityChunk(i) && readChunk(service, targetBlocks[i], i)) { + if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) { j++; } else { alignedStripe.missingChunksNum++; @@ -676,12 +621,75 @@ public class DFSStripedInputStream extends DFSInputStream { checkMissingBlocks(); } + boolean createBlockReader(LocatedBlock block, int chunkIndex) + throws IOException { + DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null); + if (dnInfo != null) { + BlockReader reader = getBlockReaderWithRetry(block, + alignedStripe.getOffsetInBlock(), + block.getBlockSize() - alignedStripe.getOffsetInBlock(), + dnInfo.addr, dnInfo.storageType, dnInfo.info, + block.getStartOffset(), new ReaderRetryPolicy()); + if (reader != null) { + readerInfos[chunkIndex] = new BlockReaderInfo(reader, block, + dnInfo.info, alignedStripe.getOffsetInBlock()); + return true; + } + } + return false; + } + + private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { + if (chunk.byteBuffer != null) { + ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer); + return new ByteBufferStrategy[]{strategy}; + } else { + ByteBufferStrategy[] strategies = + new ByteBufferStrategy[chunk.byteArray.getOffsets().length]; + for (int i = 0; i < strategies.length; i++) { + ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(), + chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]); + strategies[i] = new ByteBufferStrategy(buffer); + } + return strategies; + } + } + + boolean readChunk(final LocatedBlock block, int chunkIndex) + throws IOException { + final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; + if (block == null) { + chunk.state = StripingChunk.MISSING; + return false; + } + if (readerInfos[chunkIndex] == null) { + if (!createBlockReader(block, chunkIndex)) { + chunk.state = StripingChunk.MISSING; + return false; + } + } else if (readerInfos[chunkIndex].shouldSkip) { + chunk.state = StripingChunk.MISSING; + return false; + } + + chunk.state = StripingChunk.PENDING; + Callable readCallable = readCells(readerInfos[chunkIndex].reader, + readerInfos[chunkIndex].datanode, + readerInfos[chunkIndex].blockReaderOffset, + alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), + block.getBlock(), corruptedBlockMap); + + Future request = service.submit(readCallable); + futures.put(request, chunkIndex); + return true; + } + /** read the whole stripe. do decoding if necessary */ void readStripe() throws IOException { for (int i = 0; i < dataBlkNum; i++) { if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { - if (!readChunk(service, targetBlocks[i], i)) { + if (!readChunk(targetBlocks[i], i)) { alignedStripe.missingChunksNum++; } } @@ -700,8 +708,8 @@ public class DFSStripedInputStream extends DFSInputStream { // first read failure while (!futures.isEmpty()) { try { - StripingChunkReadResult r = getNextCompletedStripedRead(service, - futures, 0); + StripingChunkReadResult r = StripedBlockUtil + .getNextCompletedStripedRead(service, futures, 0); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe); @@ -721,7 +729,7 @@ public class DFSStripedInputStream extends DFSInputStream { } else { returnedChunk.state = StripingChunk.MISSING; // close the corresponding reader - closeReader(r.index); + closeReader(readerInfos[r.index]); final int missing = alignedStripe.missingChunksNum; alignedStripe.missingChunksNum++; @@ -750,48 +758,17 @@ public class DFSStripedInputStream extends DFSInputStream { PositionStripeReader(CompletionService service, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, + BlockReaderInfo[] readerInfos, Map> corruptedBlockMap) { - super(service, alignedStripe, targetBlocks, corruptedBlockMap); + super(service, alignedStripe, targetBlocks, readerInfos, + corruptedBlockMap); } - @Override - boolean readChunk(final CompletionService service, - final LocatedBlock block, int chunkIndex) { - final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; - if (block == null) { - chunk.state = StripingChunk.MISSING; - return false; - } - DatanodeInfo loc = block.getLocations()[0]; - StorageType type = block.getStorageTypes()[0]; - DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr( - loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())), - type); - chunk.state = StripingChunk.PENDING; - Callable readCallable = getFromOneDataNode(dnAddr, - block, alignedStripe.getOffsetInBlock(), - alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, - chunk.byteArray.buf(), chunk.byteArray.getOffsets(), - chunk.byteArray.getLengths(), corruptedBlockMap, chunkIndex); - Future getFromDNRequest = service.submit(readCallable); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Submitting striped read request for " + chunkIndex - + ". Info of the block: " + block + ", offset in block is " - + alignedStripe.getOffsetInBlock() + ", end is " - + (alignedStripe.getOffsetInBlock() - + alignedStripe.getSpanInBlock() - 1)); - } - futures.put(getFromDNRequest, chunkIndex); - return true; - } - - @Override - void updateState4SuccessRead(StripingChunkReadResult r) {} - @Override void prepareDecodeInputs() { if (decodeInputs == null) { - decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum); + decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe, + dataBlkNum, parityBlkNum); } } @@ -799,8 +776,8 @@ public class DFSStripedInputStream extends DFSInputStream { boolean prepareParityChunk(int index) { Preconditions.checkState(index >= dataBlkNum && alignedStripe.chunks[index] == null); - final int decodeIndex = convertIndex4Decode(index, dataBlkNum, - parityBlkNum); + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, + dataBlkNum, parityBlkNum); alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); alignedStripe.chunks[index].addByteArraySlice(0, (int) alignedStripe.getSpanInBlock()); @@ -809,10 +786,10 @@ public class DFSStripedInputStream extends DFSInputStream { @Override void decode() { - finalizeDecodeInputs(decodeInputs, dataBlkNum, parityBlkNum, - alignedStripe); - decodeAndFillBuffer(decodeInputs, alignedStripe, dataBlkNum, - parityBlkNum, decoder); + StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum, + parityBlkNum, alignedStripe); + StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe, + dataBlkNum, parityBlkNum, decoder); } } @@ -821,36 +798,10 @@ public class DFSStripedInputStream extends DFSInputStream { StatefulStripeReader(CompletionService service, AlignedStripe alignedStripe, LocatedBlock[] targetBlocks, + BlockReaderInfo[] readerInfos, Map> corruptedBlockMap) { - super(service, alignedStripe, targetBlocks, corruptedBlockMap); - } - - @Override - boolean readChunk(final CompletionService service, - final LocatedBlock block, int chunkIndex) { - final StripingChunk chunk = alignedStripe.chunks[chunkIndex]; - final BlockReaderInfo readerInfo = blockReaders[chunkIndex]; - if (readerInfo == null || block == null || readerInfo.shouldSkip) { - chunk.state = StripingChunk.MISSING; - return false; - } - chunk.state = StripingChunk.PENDING; - ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer); - Callable readCallable = readCell(readerInfo.reader, - readerInfo.datanode, readerInfo.blockReaderOffset, - alignedStripe.getOffsetInBlock(), strategy, - chunk.byteBuffer.remaining(), block.getBlock(), corruptedBlockMap); - Future request = readingService.submit(readCallable); - futures.put(request, chunkIndex); - return true; - } - - @Override - void updateState4SuccessRead(StripingChunkReadResult result) { - Preconditions.checkArgument( - result.state == StripingChunkReadResult.SUCCESSFUL); - blockReaders[result.index].setOffset(alignedStripe.getOffsetInBlock() - + alignedStripe.getSpanInBlock()); + super(service, alignedStripe, targetBlocks, readerInfos, + corruptedBlockMap); } @Override @@ -864,8 +815,8 @@ public class DFSStripedInputStream extends DFSInputStream { int pos = (int) (range.offsetInBlock % cellSize + cellSize * i); cur.position(pos); cur.limit((int) (pos + range.spanInBlock)); - final int decodeIndex = convertIndex4Decode(i, dataBlkNum, - parityBlkNum); + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, + dataBlkNum, parityBlkNum); decodeInputs[decodeIndex] = cur.slice(); if (alignedStripe.chunks[i] == null) { alignedStripe.chunks[i] = new StripingChunk( @@ -884,45 +835,20 @@ public class DFSStripedInputStream extends DFSInputStream { // we have failed the block reader before return false; } - final int decodeIndex = convertIndex4Decode(index, dataBlkNum, - parityBlkNum); + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index, + dataBlkNum, parityBlkNum); decodeInputs[decodeIndex] = ByteBuffer.allocateDirect( (int) alignedStripe.range.spanInBlock); alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]); - if (blockReaders[index] == null && !prepareParityBlockReader(index)) { - alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING); - return false; - } return true; } - private boolean prepareParityBlockReader(int i) throws IOException { - // prepare the block reader for the parity chunk - LocatedBlock targetBlock = targetBlocks[i]; - if (targetBlock != null) { - final long offsetInBlock = alignedStripe.getOffsetInBlock(); - DNAddrPair dnInfo = getBestNodeDNAddrPair(targetBlock, null); - if (dnInfo != null) { - BlockReader reader = getBlockReaderWithRetry(targetBlock, - offsetInBlock, targetBlock.getBlockSize() - offsetInBlock, - dnInfo.addr, dnInfo.storageType, dnInfo.info, - DFSStripedInputStream.this.getPos(), retry); - if (reader != null) { - blockReaders[i] = new BlockReaderInfo(reader, targetBlock, - dnInfo.info, offsetInBlock); - return true; - } - } - } - return false; - } - @Override void decode() { // TODO no copy for data chunks. this depends on HADOOP-12047 final int span = (int) alignedStripe.getSpanInBlock(); for (int i = 0; i < alignedStripe.chunks.length; i++) { - final int decodeIndex = convertIndex4Decode(i, + final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i, dataBlkNum, parityBlkNum); if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.ALLZERO) { @@ -941,7 +867,7 @@ public class DFSStripedInputStream extends DFSInputStream { for (int i = 0; i < alignedStripe.chunks.length; i++) { if (alignedStripe.chunks[i] != null && alignedStripe.chunks[i].state == StripingChunk.MISSING) { - decodeIndices[pos++] = convertIndex4Decode(i, + decodeIndices[pos++] = StripedBlockUtil.convertIndex4Decode(i, dataBlkNum, parityBlkNum); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index ddfd1ea546c..dcab0751d8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.BlockStoragePolicySpi; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -53,8 +52,6 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; - import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 9b0939c6dcc..3e5ef431986 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -476,41 +476,6 @@ public class StripedBlockUtil { return cells; } - /** - * Given a logical start offset in a block group, calculate the physical - * start offset into each stored internal block. - */ - public static long[] getStartOffsetsForInternalBlocks(ECSchema ecSchema, - int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup) { - Preconditions.checkArgument( - rangeStartInBlockGroup < blockGroup.getBlockSize()); - int dataBlkNum = ecSchema.getNumDataUnits(); - int parityBlkNum = ecSchema.getNumParityUnits(); - long[] startOffsets = new long[dataBlkNum + parityBlkNum]; - Arrays.fill(startOffsets, -1L); - int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize); - StripingCell firstCell = new StripingCell(ecSchema, cellSize, - firstCellIdxInBG, (int) (rangeStartInBlockGroup % cellSize)); - startOffsets[firstCell.idxInStripe] = - firstCell.idxInInternalBlk * cellSize + firstCell.offset; - long earliestStart = startOffsets[firstCell.idxInStripe]; - for (int i = 1; i < dataBlkNum; i++) { - int idx = firstCellIdxInBG + i; - if (idx * (long) cellSize >= blockGroup.getBlockSize()) { - break; - } - StripingCell cell = new StripingCell(ecSchema, cellSize, idx, 0); - startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * (long) cellSize; - if (startOffsets[cell.idxInStripe] < earliestStart) { - earliestStart = startOffsets[cell.idxInStripe]; - } - } - for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) { - startOffsets[i] = earliestStart; - } - return startOffsets; - } - /** * Given a logical byte range, mapped to each {@link StripingCell}, calculate * the physical byte range (inclusive) on each stored internal block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 815a50d80a5..2866a0ee0c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -79,10 +79,19 @@ public class StripedFileTestUtil { for (int startOffset : startOffsets) { startOffset = Math.max(0, Math.min(startOffset, fileLength - 1)); int remaining = fileLength - startOffset; - in.readFully(startOffset, buf, 0, remaining); - for (int i = 0; i < remaining; i++) { - Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " + - "same", expected[startOffset + i], buf[i]); + int offset = startOffset; + final byte[] result = new byte[remaining]; + while (remaining > 0) { + int target = Math.min(remaining, buf.length); + in.readFully(offset, buf, 0, target); + System.arraycopy(buf, 0, result, offset - startOffset, target); + remaining -= target; + offset += target; + } + for (int i = 0; i < fileLength - startOffset; i++) { + Assert.assertEquals("Byte at " + (startOffset + i) + " is different, " + + "the startOffset is " + startOffset, + expected[startOffset + i], result[i]); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index 2f9322dcdd0..089a13438e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -19,13 +19,16 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,6 +48,11 @@ public class TestWriteReadStripedFile { private static FileSystem fs; private static Configuration conf = new HdfsConfiguration(); + static { + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)) + .getLogger().setLevel(Level.ALL); + } + @Before public void setup() throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); @@ -232,7 +240,8 @@ public class TestWriteReadStripedFile { byte[] smallBuf = new byte[1024]; byte[] largeBuf = new byte[fileLength + 100]; - StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + // TODO: HDFS-8797 + //StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);