HDFS-10861. Refactor StripeReaders and use ECChunk version decode API. Contributed by Sammi Chen
This commit is contained in:
parent
2b66d9ec5b
commit
734d54c1a8
|
@ -29,6 +29,9 @@ public class ECChunk {
|
||||||
|
|
||||||
private ByteBuffer chunkBuffer;
|
private ByteBuffer chunkBuffer;
|
||||||
|
|
||||||
|
// TODO: should be in a more general flags
|
||||||
|
private boolean allZero = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapping a ByteBuffer
|
* Wrapping a ByteBuffer
|
||||||
* @param buffer buffer to be wrapped by the chunk
|
* @param buffer buffer to be wrapped by the chunk
|
||||||
|
@ -37,6 +40,13 @@ public class ECChunk {
|
||||||
this.chunkBuffer = buffer;
|
this.chunkBuffer = buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ECChunk(ByteBuffer buffer, int offset, int len) {
|
||||||
|
ByteBuffer tmp = buffer.duplicate();
|
||||||
|
tmp.position(offset);
|
||||||
|
tmp.limit(offset + len);
|
||||||
|
this.chunkBuffer = tmp.slice();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapping a bytes array
|
* Wrapping a bytes array
|
||||||
* @param buffer buffer to be wrapped by the chunk
|
* @param buffer buffer to be wrapped by the chunk
|
||||||
|
@ -45,6 +55,18 @@ public class ECChunk {
|
||||||
this.chunkBuffer = ByteBuffer.wrap(buffer);
|
this.chunkBuffer = ByteBuffer.wrap(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ECChunk(byte[] buffer, int offset, int len) {
|
||||||
|
this.chunkBuffer = ByteBuffer.wrap(buffer, offset, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAllZero() {
|
||||||
|
return allZero;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAllZero(boolean allZero) {
|
||||||
|
this.allZero = allZero;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert to ByteBuffer
|
* Convert to ByteBuffer
|
||||||
* @return ByteBuffer
|
* @return ByteBuffer
|
||||||
|
|
|
@ -115,6 +115,9 @@ final class CoderUtil {
|
||||||
buffers[i] = null;
|
buffers[i] = null;
|
||||||
} else {
|
} else {
|
||||||
buffers[i] = chunk.getBuffer();
|
buffers[i] = chunk.getBuffer();
|
||||||
|
if (chunk.isAllZero()) {
|
||||||
|
CoderUtil.resetBuffer(buffers[i], buffers[i].remaining());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -240,7 +240,7 @@ public class DFSInputStream extends FSInputStream
|
||||||
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
|
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
|
||||||
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
|
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
|
||||||
while (oldIter.hasNext() && newIter.hasNext()) {
|
while (oldIter.hasNext() && newIter.hasNext()) {
|
||||||
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
||||||
throw new IOException("Blocklist for " + src + " has changed!");
|
throw new IOException("Blocklist for " + src + " has changed!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -677,8 +677,8 @@ public class DFSInputStream extends FSInputStream
|
||||||
if (oneByteBuf == null) {
|
if (oneByteBuf == null) {
|
||||||
oneByteBuf = new byte[1];
|
oneByteBuf = new byte[1];
|
||||||
}
|
}
|
||||||
int ret = read( oneByteBuf, 0, 1 );
|
int ret = read(oneByteBuf, 0, 1);
|
||||||
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is a used by regular read() and handles ChecksumExceptions.
|
/* This is a used by regular read() and handles ChecksumExceptions.
|
||||||
|
@ -702,7 +702,7 @@ public class DFSInputStream extends FSInputStream
|
||||||
// retry as many times as seekToNewSource allows.
|
// retry as many times as seekToNewSource allows.
|
||||||
try {
|
try {
|
||||||
return reader.readFromBlock(blockReader, len);
|
return reader.readFromBlock(blockReader, len);
|
||||||
} catch ( ChecksumException ce ) {
|
} catch (ChecksumException ce) {
|
||||||
DFSClient.LOG.warn("Found Checksum error for "
|
DFSClient.LOG.warn("Found Checksum error for "
|
||||||
+ getCurrentBlock() + " from " + currentNode
|
+ getCurrentBlock() + " from " + currentNode
|
||||||
+ " at " + ce.getPos());
|
+ " at " + ce.getPos());
|
||||||
|
@ -710,7 +710,7 @@ public class DFSInputStream extends FSInputStream
|
||||||
retryCurrentNode = false;
|
retryCurrentNode = false;
|
||||||
// we want to remember which block replicas we have tried
|
// we want to remember which block replicas we have tried
|
||||||
corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode);
|
corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode);
|
||||||
} catch ( IOException e ) {
|
} catch (IOException e) {
|
||||||
if (!retryCurrentNode) {
|
if (!retryCurrentNode) {
|
||||||
DFSClient.LOG.warn("Exception while reading from "
|
DFSClient.LOG.warn("Exception while reading from "
|
||||||
+ getCurrentBlock() + " of " + src + " from "
|
+ getCurrentBlock() + " of " + src + " from "
|
||||||
|
@ -779,7 +779,9 @@ public class DFSInputStream extends FSInputStream
|
||||||
DFSClient.LOG.warn("DFS Read", e);
|
DFSClient.LOG.warn("DFS Read", e);
|
||||||
}
|
}
|
||||||
blockEnd = -1;
|
blockEnd = -1;
|
||||||
if (currentNode != null) { addToDeadNodes(currentNode); }
|
if (currentNode != null) {
|
||||||
|
addToDeadNodes(currentNode);
|
||||||
|
}
|
||||||
if (--retries == 0) {
|
if (--retries == 0) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -1397,10 +1399,10 @@ public class DFSInputStream extends FSInputStream
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long skip(long n) throws IOException {
|
public long skip(long n) throws IOException {
|
||||||
if ( n > 0 ) {
|
if (n > 0) {
|
||||||
long curPos = getPos();
|
long curPos = getPos();
|
||||||
long fileLen = getFileLength();
|
long fileLen = getFileLength();
|
||||||
if( n+curPos > fileLen ) {
|
if (n+curPos > fileLen) {
|
||||||
n = fileLen - curPos;
|
n = fileLen - curPos;
|
||||||
}
|
}
|
||||||
seek(curPos+n);
|
seek(curPos+n);
|
||||||
|
@ -1550,7 +1552,7 @@ public class DFSInputStream extends FSInputStream
|
||||||
* Get statistics about the reads which this DFSInputStream has done.
|
* Get statistics about the reads which this DFSInputStream has done.
|
||||||
*/
|
*/
|
||||||
public ReadStatistics getReadStatistics() {
|
public ReadStatistics getReadStatistics() {
|
||||||
return new ReadStatistics(readStatistics);
|
return readStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -17,24 +17,21 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.StripeReader.BlockReaderInfo;
|
||||||
|
import org.apache.hadoop.hdfs.StripeReader.ReaderRetryPolicy;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange;
|
||||||
import org.apache.hadoop.io.ByteBufferPool;
|
import org.apache.hadoop.io.ByteBufferPool;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
|
||||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
|
||||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
|
||||||
|
|
||||||
import org.apache.hadoop.io.ElasticByteBufferPool;
|
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||||
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
@ -44,7 +41,6 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -53,111 +49,32 @@ import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.concurrent.CompletionService;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DFSStripedInputStream reads from striped block groups
|
* DFSStripedInputStream reads from striped block groups.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DFSStripedInputStream extends DFSInputStream {
|
public class DFSStripedInputStream extends DFSInputStream {
|
||||||
|
|
||||||
private static class ReaderRetryPolicy {
|
|
||||||
private int fetchEncryptionKeyTimes = 1;
|
|
||||||
private int fetchTokenTimes = 1;
|
|
||||||
|
|
||||||
void refetchEncryptionKey() {
|
|
||||||
fetchEncryptionKeyTimes--;
|
|
||||||
}
|
|
||||||
|
|
||||||
void refetchToken() {
|
|
||||||
fetchTokenTimes--;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean shouldRefetchEncryptionKey() {
|
|
||||||
return fetchEncryptionKeyTimes > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean shouldRefetchToken() {
|
|
||||||
return fetchTokenTimes > 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Used to indicate the buffered data's range in the block group */
|
|
||||||
private static class StripeRange {
|
|
||||||
/** start offset in the block group (inclusive) */
|
|
||||||
final long offsetInBlock;
|
|
||||||
/** length of the stripe range */
|
|
||||||
final long length;
|
|
||||||
|
|
||||||
StripeRange(long offsetInBlock, long length) {
|
|
||||||
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
|
|
||||||
this.offsetInBlock = offsetInBlock;
|
|
||||||
this.length = length;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean include(long pos) {
|
|
||||||
return pos >= offsetInBlock && pos < offsetInBlock + length;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class BlockReaderInfo {
|
|
||||||
final BlockReader reader;
|
|
||||||
final DatanodeInfo datanode;
|
|
||||||
/**
|
|
||||||
* when initializing block readers, their starting offsets are set to the same
|
|
||||||
* number: the smallest internal block offsets among all the readers. This is
|
|
||||||
* because it is possible that for some internal blocks we have to read
|
|
||||||
* "backwards" for decoding purpose. We thus use this offset array to track
|
|
||||||
* offsets for all the block readers so that we can skip data if necessary.
|
|
||||||
*/
|
|
||||||
long blockReaderOffset;
|
|
||||||
/**
|
|
||||||
* We use this field to indicate whether we should use this reader. In case
|
|
||||||
* we hit any issue with this reader, we set this field to true and avoid
|
|
||||||
* using it for the next stripe.
|
|
||||||
*/
|
|
||||||
boolean shouldSkip = false;
|
|
||||||
|
|
||||||
BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
|
|
||||||
this.reader = reader;
|
|
||||||
this.datanode = dn;
|
|
||||||
this.blockReaderOffset = offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setOffset(long offset) {
|
|
||||||
this.blockReaderOffset = offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
void skip() {
|
|
||||||
this.shouldSkip = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
|
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
|
||||||
|
|
||||||
private final BlockReaderInfo[] blockReaders;
|
private final BlockReaderInfo[] blockReaders;
|
||||||
private final int cellSize;
|
private final int cellSize;
|
||||||
private final short dataBlkNum;
|
private final short dataBlkNum;
|
||||||
private final short parityBlkNum;
|
private final short parityBlkNum;
|
||||||
private final int groupSize;
|
private final int groupSize;
|
||||||
/** the buffer for a complete stripe */
|
/** the buffer for a complete stripe. */
|
||||||
private ByteBuffer curStripeBuf;
|
private ByteBuffer curStripeBuf;
|
||||||
private ByteBuffer parityBuf;
|
private ByteBuffer parityBuf;
|
||||||
private final ErasureCodingPolicy ecPolicy;
|
private final ErasureCodingPolicy ecPolicy;
|
||||||
private final RawErasureDecoder decoder;
|
private final RawErasureDecoder decoder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* indicate the start/end offset of the current buffered stripe in the
|
* Indicate the start/end offset of the current buffered stripe in the
|
||||||
* block group
|
* block group.
|
||||||
*/
|
*/
|
||||||
private StripeRange curStripeRange;
|
private StripeRange curStripeRange;
|
||||||
private final CompletionService<Void> readingService;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When warning the user of a lost block in striping mode, we remember the
|
* When warning the user of a lost block in striping mode, we remember the
|
||||||
|
@ -167,8 +84,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
*
|
*
|
||||||
* To minimize the overhead, we only store the datanodeUuid in this set
|
* To minimize the overhead, we only store the datanodeUuid in this set
|
||||||
*/
|
*/
|
||||||
private final Set<String> warnedNodes = Collections.newSetFromMap(
|
private final Set<String> warnedNodes =
|
||||||
new ConcurrentHashMap<String, Boolean>());
|
Collections.newSetFromMap(new ConcurrentHashMap<>());
|
||||||
|
|
||||||
DFSStripedInputStream(DFSClient dfsClient, String src,
|
DFSStripedInputStream(DFSClient dfsClient, String src,
|
||||||
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
|
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
|
||||||
|
@ -183,8 +100,6 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
groupSize = dataBlkNum + parityBlkNum;
|
groupSize = dataBlkNum + parityBlkNum;
|
||||||
blockReaders = new BlockReaderInfo[groupSize];
|
blockReaders = new BlockReaderInfo[groupSize];
|
||||||
curStripeRange = new StripeRange(0, 0);
|
curStripeRange = new StripeRange(0, 0);
|
||||||
readingService =
|
|
||||||
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
|
|
||||||
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
|
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
|
||||||
dataBlkNum, parityBlkNum);
|
dataBlkNum, parityBlkNum);
|
||||||
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
|
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
|
||||||
|
@ -198,7 +113,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
return decoder.preferDirectBuffer();
|
return decoder.preferDirectBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void resetCurStripeBuffer() {
|
void resetCurStripeBuffer() {
|
||||||
if (curStripeBuf == null) {
|
if (curStripeBuf == null) {
|
||||||
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
||||||
cellSize * dataBlkNum);
|
cellSize * dataBlkNum);
|
||||||
|
@ -207,7 +122,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
curStripeRange = new StripeRange(0, 0);
|
curStripeRange = new StripeRange(0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ByteBuffer getParityBuffer() {
|
protected ByteBuffer getParityBuffer() {
|
||||||
if (parityBuf == null) {
|
if (parityBuf == null) {
|
||||||
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
||||||
cellSize * parityBlkNum);
|
cellSize * parityBlkNum);
|
||||||
|
@ -216,6 +131,29 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
return parityBuf;
|
return parityBuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ByteBuffer getCurStripeBuf() {
|
||||||
|
return curStripeBuf;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getSrc() {
|
||||||
|
return src;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected DFSClient getDFSClient() {
|
||||||
|
return dfsClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected LocatedBlocks getLocatedBlocks() {
|
||||||
|
return locatedBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ByteBufferPool getBufferPool() {
|
||||||
|
return BUFFER_POOL;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ThreadPoolExecutor getStripedReadsThreadPool(){
|
||||||
|
return dfsClient.getStripedReadsThreadPool();
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* When seeking into a new block group, create blockReader for each internal
|
* When seeking into a new block group, create blockReader for each internal
|
||||||
* block in the group.
|
* block in the group.
|
||||||
|
@ -268,7 +206,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
blockEnd = -1;
|
blockEnd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeReader(BlockReaderInfo readerInfo) {
|
protected void closeReader(BlockReaderInfo readerInfo) {
|
||||||
if (readerInfo != null) {
|
if (readerInfo != null) {
|
||||||
if (readerInfo.reader != null) {
|
if (readerInfo.reader != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -288,6 +226,59 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
return pos - currentLocatedBlock.getStartOffset();
|
return pos - currentLocatedBlock.getStartOffset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
|
||||||
|
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
||||||
|
int chunkIndex) throws IOException {
|
||||||
|
BlockReader reader = null;
|
||||||
|
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
||||||
|
DFSInputStream.DNAddrPair dnInfo =
|
||||||
|
new DFSInputStream.DNAddrPair(null, null, null);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
// the cached block location might have been re-fetched, so always
|
||||||
|
// get it from cache.
|
||||||
|
block = refreshLocatedBlock(block);
|
||||||
|
targetBlocks[chunkIndex] = block;
|
||||||
|
|
||||||
|
// internal block has one location, just rule out the deadNodes
|
||||||
|
dnInfo = getBestNodeDNAddrPair(block, null);
|
||||||
|
if (dnInfo == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
reader = getBlockReader(block, offsetInBlock,
|
||||||
|
block.getBlockSize() - offsetInBlock,
|
||||||
|
dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (e instanceof InvalidEncryptionKeyException &&
|
||||||
|
retry.shouldRefetchEncryptionKey()) {
|
||||||
|
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
||||||
|
+ "encryption key was invalid when connecting to " + dnInfo.addr
|
||||||
|
+ " : " + e);
|
||||||
|
dfsClient.clearDataEncryptionKey();
|
||||||
|
retry.refetchEncryptionKey();
|
||||||
|
} else if (retry.shouldRefetchToken() &&
|
||||||
|
tokenRefetchNeeded(e, dnInfo.addr)) {
|
||||||
|
fetchBlockAt(block.getStartOffset());
|
||||||
|
retry.refetchToken();
|
||||||
|
} else {
|
||||||
|
//TODO: handles connection issues
|
||||||
|
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
|
||||||
|
"block" + block.getBlock(), e);
|
||||||
|
// re-fetch the block in case the block has been moved
|
||||||
|
fetchBlockAt(block.getStartOffset());
|
||||||
|
addToDeadNodes(dnInfo.info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (reader != null) {
|
||||||
|
readerInfos[chunkIndex] =
|
||||||
|
new BlockReaderInfo(reader, dnInfo.info, offsetInBlock);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read a new stripe covering the current position, and store the data in the
|
* Read a new stripe covering the current position, and store the data in the
|
||||||
* {@link #curStripeBuf}.
|
* {@link #curStripeBuf}.
|
||||||
|
@ -303,20 +294,20 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
|
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
|
||||||
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
|
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
|
||||||
- (stripeIndex * stripeLen), stripeLen);
|
- (stripeIndex * stripeLen), stripeLen);
|
||||||
StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
|
StripeRange stripeRange =
|
||||||
stripeLimit - stripeBufOffset);
|
new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset);
|
||||||
|
|
||||||
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
|
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
|
||||||
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
|
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
|
||||||
cellSize, blockGroup, offsetInBlockGroup,
|
cellSize, blockGroup, offsetInBlockGroup,
|
||||||
offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
|
offsetInBlockGroup + stripeRange.getLength() - 1, curStripeBuf);
|
||||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
||||||
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||||
// read the whole stripe
|
// read the whole stripe
|
||||||
for (AlignedStripe stripe : stripes) {
|
for (AlignedStripe stripe : stripes) {
|
||||||
// Parse group to get chosen DN location
|
// Parse group to get chosen DN location
|
||||||
StripeReader sreader = new StatefulStripeReader(readingService, stripe,
|
StripeReader sreader = new StatefulStripeReader(stripe, ecPolicy, blks,
|
||||||
blks, blockReaders, corruptedBlocks);
|
blockReaders, corruptedBlocks, decoder, this);
|
||||||
sreader.readStripe();
|
sreader.readStripe();
|
||||||
}
|
}
|
||||||
curStripeBuf.position(stripeBufOffset);
|
curStripeBuf.position(stripeBufOffset);
|
||||||
|
@ -324,69 +315,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
curStripeRange = stripeRange;
|
curStripeRange = stripeRange;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Callable<Void> readCells(final BlockReader reader,
|
|
||||||
final DatanodeInfo datanode, final long currentReaderOffset,
|
|
||||||
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
|
|
||||||
final ExtendedBlock currentBlock,
|
|
||||||
final CorruptedBlocks corruptedBlocks) {
|
|
||||||
return new Callable<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void call() throws Exception {
|
|
||||||
// reader can be null if getBlockReaderWithRetry failed or
|
|
||||||
// the reader hit exception before
|
|
||||||
if (reader == null) {
|
|
||||||
throw new IOException("The BlockReader is null. " +
|
|
||||||
"The BlockReader creation failed or the reader hit exception.");
|
|
||||||
}
|
|
||||||
Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
|
|
||||||
if (currentReaderOffset < targetReaderOffset) {
|
|
||||||
long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
|
|
||||||
Preconditions.checkState(
|
|
||||||
skipped == targetReaderOffset - currentReaderOffset);
|
|
||||||
}
|
|
||||||
int result = 0;
|
|
||||||
for (ByteBufferStrategy strategy : strategies) {
|
|
||||||
result += readToBuffer(reader, datanode, strategy, currentBlock,
|
|
||||||
corruptedBlocks);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private int readToBuffer(BlockReader blockReader,
|
|
||||||
DatanodeInfo currentNode, ByteBufferStrategy strategy,
|
|
||||||
ExtendedBlock currentBlock,
|
|
||||||
CorruptedBlocks corruptedBlocks)
|
|
||||||
throws IOException {
|
|
||||||
final int targetLength = strategy.getTargetLength();
|
|
||||||
int length = 0;
|
|
||||||
try {
|
|
||||||
while (length < targetLength) {
|
|
||||||
int ret = strategy.readFromBlock(blockReader);
|
|
||||||
if (ret < 0) {
|
|
||||||
throw new IOException("Unexpected EOS from the reader");
|
|
||||||
}
|
|
||||||
length += ret;
|
|
||||||
}
|
|
||||||
return length;
|
|
||||||
} catch (ChecksumException ce) {
|
|
||||||
DFSClient.LOG.warn("Found Checksum error for "
|
|
||||||
+ currentBlock + " from " + currentNode
|
|
||||||
+ " at " + ce.getPos());
|
|
||||||
// we want to remember which block replicas we have tried
|
|
||||||
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
|
|
||||||
throw ce;
|
|
||||||
} catch (IOException e) {
|
|
||||||
DFSClient.LOG.warn("Exception while reading from "
|
|
||||||
+ currentBlock + " of " + src + " from "
|
|
||||||
+ currentNode, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Seek to a new arbitrary location
|
* Seek to a new arbitrary location.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void seek(long targetPos) throws IOException {
|
public synchronized void seek(long targetPos) throws IOException {
|
||||||
|
@ -469,7 +399,7 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copy the data from {@link #curStripeBuf} into the given buffer
|
* Copy the data from {@link #curStripeBuf} into the given buffer.
|
||||||
* @param strategy the ReaderStrategy containing the given buffer
|
* @param strategy the ReaderStrategy containing the given buffer
|
||||||
* @param length target length
|
* @param length target length
|
||||||
* @return number of bytes copied
|
* @return number of bytes copied
|
||||||
|
@ -530,17 +460,19 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
|
|
||||||
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
|
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
|
||||||
ecPolicy, cellSize, blockGroup, start, end, buf);
|
ecPolicy, cellSize, blockGroup, start, end, buf);
|
||||||
CompletionService<Void> readService = new ExecutorCompletionService<>(
|
|
||||||
dfsClient.getStripedReadsThreadPool());
|
|
||||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
|
||||||
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
blockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||||
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
|
final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
|
||||||
try {
|
try {
|
||||||
for (AlignedStripe stripe : stripes) {
|
for (AlignedStripe stripe : stripes) {
|
||||||
// Parse group to get chosen DN location
|
// Parse group to get chosen DN location
|
||||||
StripeReader preader = new PositionStripeReader(readService, stripe,
|
StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
|
||||||
blks, preaderInfos, corruptedBlocks);
|
preaderInfos, corruptedBlocks, decoder, this);
|
||||||
preader.readStripe();
|
try {
|
||||||
|
preader.readStripe();
|
||||||
|
} finally {
|
||||||
|
preader.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
buf.position(buf.position() + (int)(end - start + 1));
|
buf.position(buf.position() + (int)(end - start + 1));
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -570,376 +502,6 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The reader for reading a complete {@link AlignedStripe}. Note that an
|
|
||||||
* {@link AlignedStripe} may cross multiple stripes with cellSize width.
|
|
||||||
*/
|
|
||||||
private abstract class StripeReader {
|
|
||||||
final Map<Future<Void>, Integer> futures = new HashMap<>();
|
|
||||||
final AlignedStripe alignedStripe;
|
|
||||||
final CompletionService<Void> service;
|
|
||||||
final LocatedBlock[] targetBlocks;
|
|
||||||
final CorruptedBlocks corruptedBlocks;
|
|
||||||
final BlockReaderInfo[] readerInfos;
|
|
||||||
|
|
||||||
StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
|
|
||||||
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
|
|
||||||
CorruptedBlocks corruptedBlocks) {
|
|
||||||
this.service = service;
|
|
||||||
this.alignedStripe = alignedStripe;
|
|
||||||
this.targetBlocks = targetBlocks;
|
|
||||||
this.readerInfos = readerInfos;
|
|
||||||
this.corruptedBlocks = corruptedBlocks;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** prepare all the data chunks */
|
|
||||||
abstract void prepareDecodeInputs();
|
|
||||||
|
|
||||||
/** prepare the parity chunk and block reader if necessary */
|
|
||||||
abstract boolean prepareParityChunk(int index);
|
|
||||||
|
|
||||||
abstract void decode();
|
|
||||||
|
|
||||||
void updateState4SuccessRead(StripingChunkReadResult result) {
|
|
||||||
Preconditions.checkArgument(
|
|
||||||
result.state == StripingChunkReadResult.SUCCESSFUL);
|
|
||||||
readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
|
|
||||||
+ alignedStripe.getSpanInBlock());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkMissingBlocks() throws IOException {
|
|
||||||
if (alignedStripe.missingChunksNum > parityBlkNum) {
|
|
||||||
clearFutures(futures.keySet());
|
|
||||||
throw new IOException(alignedStripe.missingChunksNum
|
|
||||||
+ " missing blocks, the stripe is: " + alignedStripe
|
|
||||||
+ "; locatedBlocks is: " + locatedBlocks);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We need decoding. Thus go through all the data chunks and make sure we
|
|
||||||
* submit read requests for all of them.
|
|
||||||
*/
|
|
||||||
private void readDataForDecoding() throws IOException {
|
|
||||||
prepareDecodeInputs();
|
|
||||||
for (int i = 0; i < dataBlkNum; i++) {
|
|
||||||
Preconditions.checkNotNull(alignedStripe.chunks[i]);
|
|
||||||
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
|
|
||||||
if (!readChunk(targetBlocks[i], i)) {
|
|
||||||
alignedStripe.missingChunksNum++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
checkMissingBlocks();
|
|
||||||
}
|
|
||||||
|
|
||||||
void readParityChunks(int num) throws IOException {
|
|
||||||
for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
|
|
||||||
i++) {
|
|
||||||
if (alignedStripe.chunks[i] == null) {
|
|
||||||
if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
|
|
||||||
j++;
|
|
||||||
} else {
|
|
||||||
alignedStripe.missingChunksNum++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
checkMissingBlocks();
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean createBlockReader(LocatedBlock block, int chunkIndex)
|
|
||||||
throws IOException {
|
|
||||||
BlockReader reader = null;
|
|
||||||
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
|
||||||
DNAddrPair dnInfo = new DNAddrPair(null, null, null);
|
|
||||||
|
|
||||||
while(true) {
|
|
||||||
try {
|
|
||||||
// the cached block location might have been re-fetched, so always
|
|
||||||
// get it from cache.
|
|
||||||
block = refreshLocatedBlock(block);
|
|
||||||
targetBlocks[chunkIndex] = block;
|
|
||||||
|
|
||||||
// internal block has one location, just rule out the deadNodes
|
|
||||||
dnInfo = getBestNodeDNAddrPair(block, null);
|
|
||||||
if (dnInfo == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
|
|
||||||
block.getBlockSize() - alignedStripe.getOffsetInBlock(),
|
|
||||||
dnInfo.addr, dnInfo.storageType, dnInfo.info);
|
|
||||||
} catch (IOException e) {
|
|
||||||
if (e instanceof InvalidEncryptionKeyException &&
|
|
||||||
retry.shouldRefetchEncryptionKey()) {
|
|
||||||
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
|
|
||||||
+ "encryption key was invalid when connecting to " + dnInfo.addr
|
|
||||||
+ " : " + e);
|
|
||||||
dfsClient.clearDataEncryptionKey();
|
|
||||||
retry.refetchEncryptionKey();
|
|
||||||
} else if (retry.shouldRefetchToken() &&
|
|
||||||
tokenRefetchNeeded(e, dnInfo.addr)) {
|
|
||||||
fetchBlockAt(block.getStartOffset());
|
|
||||||
retry.refetchToken();
|
|
||||||
} else {
|
|
||||||
//TODO: handles connection issues
|
|
||||||
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
|
|
||||||
"block" + block.getBlock(), e);
|
|
||||||
// re-fetch the block in case the block has been moved
|
|
||||||
fetchBlockAt(block.getStartOffset());
|
|
||||||
addToDeadNodes(dnInfo.info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (reader != null) {
|
|
||||||
readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info,
|
|
||||||
alignedStripe.getOffsetInBlock());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
|
|
||||||
if (chunk.useByteBuffer()) {
|
|
||||||
ByteBufferStrategy strategy = new ByteBufferStrategy(
|
|
||||||
chunk.getByteBuffer(), readStatistics, dfsClient);
|
|
||||||
return new ByteBufferStrategy[]{strategy};
|
|
||||||
} else {
|
|
||||||
ByteBufferStrategy[] strategies =
|
|
||||||
new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
|
|
||||||
for (int i = 0; i < strategies.length; i++) {
|
|
||||||
ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
|
|
||||||
strategies[i] =
|
|
||||||
new ByteBufferStrategy(buffer, readStatistics, dfsClient);
|
|
||||||
}
|
|
||||||
return strategies;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean readChunk(final LocatedBlock block, int chunkIndex)
|
|
||||||
throws IOException {
|
|
||||||
final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
|
|
||||||
if (block == null) {
|
|
||||||
chunk.state = StripingChunk.MISSING;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (readerInfos[chunkIndex] == null) {
|
|
||||||
if (!createBlockReader(block, chunkIndex)) {
|
|
||||||
chunk.state = StripingChunk.MISSING;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else if (readerInfos[chunkIndex].shouldSkip) {
|
|
||||||
chunk.state = StripingChunk.MISSING;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
chunk.state = StripingChunk.PENDING;
|
|
||||||
Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
|
|
||||||
readerInfos[chunkIndex].datanode,
|
|
||||||
readerInfos[chunkIndex].blockReaderOffset,
|
|
||||||
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
|
|
||||||
block.getBlock(), corruptedBlocks);
|
|
||||||
|
|
||||||
Future<Void> request = service.submit(readCallable);
|
|
||||||
futures.put(request, chunkIndex);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** read the whole stripe. do decoding if necessary */
|
|
||||||
void readStripe() throws IOException {
|
|
||||||
for (int i = 0; i < dataBlkNum; i++) {
|
|
||||||
if (alignedStripe.chunks[i] != null &&
|
|
||||||
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
|
|
||||||
if (!readChunk(targetBlocks[i], i)) {
|
|
||||||
alignedStripe.missingChunksNum++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// There are missing block locations at this stage. Thus we need to read
|
|
||||||
// the full stripe and one more parity block.
|
|
||||||
if (alignedStripe.missingChunksNum > 0) {
|
|
||||||
checkMissingBlocks();
|
|
||||||
readDataForDecoding();
|
|
||||||
// read parity chunks
|
|
||||||
readParityChunks(alignedStripe.missingChunksNum);
|
|
||||||
}
|
|
||||||
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
|
|
||||||
|
|
||||||
// Input buffers for potential decode operation, which remains null until
|
|
||||||
// first read failure
|
|
||||||
while (!futures.isEmpty()) {
|
|
||||||
try {
|
|
||||||
StripingChunkReadResult r = StripedBlockUtil
|
|
||||||
.getNextCompletedStripedRead(service, futures, 0);
|
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
|
||||||
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
|
|
||||||
+ alignedStripe);
|
|
||||||
}
|
|
||||||
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
|
|
||||||
Preconditions.checkNotNull(returnedChunk);
|
|
||||||
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
|
|
||||||
|
|
||||||
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
|
|
||||||
returnedChunk.state = StripingChunk.FETCHED;
|
|
||||||
alignedStripe.fetchedChunksNum++;
|
|
||||||
updateState4SuccessRead(r);
|
|
||||||
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
|
|
||||||
clearFutures(futures.keySet());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
returnedChunk.state = StripingChunk.MISSING;
|
|
||||||
// close the corresponding reader
|
|
||||||
closeReader(readerInfos[r.index]);
|
|
||||||
|
|
||||||
final int missing = alignedStripe.missingChunksNum;
|
|
||||||
alignedStripe.missingChunksNum++;
|
|
||||||
checkMissingBlocks();
|
|
||||||
|
|
||||||
readDataForDecoding();
|
|
||||||
readParityChunks(alignedStripe.missingChunksNum - missing);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
String err = "Read request interrupted";
|
|
||||||
DFSClient.LOG.error(err);
|
|
||||||
clearFutures(futures.keySet());
|
|
||||||
// Don't decode if read interrupted
|
|
||||||
throw new InterruptedIOException(err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (alignedStripe.missingChunksNum > 0) {
|
|
||||||
decode();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class PositionStripeReader extends StripeReader {
|
|
||||||
private ByteBuffer[] decodeInputs = null;
|
|
||||||
|
|
||||||
PositionStripeReader(CompletionService<Void> service,
|
|
||||||
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
|
|
||||||
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
|
|
||||||
super(service, alignedStripe, targetBlocks, readerInfos,
|
|
||||||
corruptedBlocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void prepareDecodeInputs() {
|
|
||||||
if (decodeInputs == null) {
|
|
||||||
decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
|
|
||||||
dataBlkNum, parityBlkNum);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
boolean prepareParityChunk(int index) {
|
|
||||||
Preconditions.checkState(index >= dataBlkNum &&
|
|
||||||
alignedStripe.chunks[index] == null);
|
|
||||||
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void decode() {
|
|
||||||
StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe);
|
|
||||||
StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
|
|
||||||
dataBlkNum, parityBlkNum, decoder);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class StatefulStripeReader extends StripeReader {
|
|
||||||
ByteBuffer[] decodeInputs;
|
|
||||||
|
|
||||||
StatefulStripeReader(CompletionService<Void> service,
|
|
||||||
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
|
|
||||||
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
|
|
||||||
super(service, alignedStripe, targetBlocks, readerInfos,
|
|
||||||
corruptedBlocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void prepareDecodeInputs() {
|
|
||||||
if (decodeInputs == null) {
|
|
||||||
decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
|
|
||||||
final ByteBuffer cur;
|
|
||||||
synchronized (DFSStripedInputStream.this) {
|
|
||||||
cur = curStripeBuf.duplicate();
|
|
||||||
}
|
|
||||||
StripedBlockUtil.VerticalRange range = alignedStripe.range;
|
|
||||||
for (int i = 0; i < dataBlkNum; i++) {
|
|
||||||
cur.limit(cur.capacity());
|
|
||||||
int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
|
|
||||||
cur.position(pos);
|
|
||||||
cur.limit((int) (pos + range.spanInBlock));
|
|
||||||
decodeInputs[i] = cur.slice();
|
|
||||||
if (alignedStripe.chunks[i] == null) {
|
|
||||||
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
boolean prepareParityChunk(int index) {
|
|
||||||
Preconditions.checkState(index >= dataBlkNum
|
|
||||||
&& alignedStripe.chunks[index] == null);
|
|
||||||
if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
|
|
||||||
alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
|
|
||||||
// we have failed the block reader before
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
final int parityIndex = index - dataBlkNum;
|
|
||||||
ByteBuffer buf = getParityBuffer().duplicate();
|
|
||||||
buf.position(cellSize * parityIndex);
|
|
||||||
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
|
|
||||||
decodeInputs[index] = buf.slice();
|
|
||||||
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void decode() {
|
|
||||||
final int span = (int) alignedStripe.getSpanInBlock();
|
|
||||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
|
||||||
if (alignedStripe.chunks[i] != null &&
|
|
||||||
alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
|
|
||||||
for (int j = 0; j < span; j++) {
|
|
||||||
decodeInputs[i].put((byte) 0);
|
|
||||||
}
|
|
||||||
decodeInputs[i].flip();
|
|
||||||
} else if (alignedStripe.chunks[i] != null &&
|
|
||||||
alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
|
|
||||||
decodeInputs[i].position(0);
|
|
||||||
decodeInputs[i].limit(span);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int[] decodeIndices = new int[parityBlkNum];
|
|
||||||
int pos = 0;
|
|
||||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
|
||||||
if (alignedStripe.chunks[i] != null &&
|
|
||||||
alignedStripe.chunks[i].state == StripingChunk.MISSING) {
|
|
||||||
if (i < dataBlkNum) {
|
|
||||||
decodeIndices[pos++] = i;
|
|
||||||
} else {
|
|
||||||
decodeInputs[i] = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
decodeIndices = Arrays.copyOf(decodeIndices, pos);
|
|
||||||
|
|
||||||
final int decodeChunkNum = decodeIndices.length;
|
|
||||||
ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
|
|
||||||
for (int i = 0; i < decodeChunkNum; i++) {
|
|
||||||
outputs[i] = decodeInputs[decodeIndices[i]];
|
|
||||||
outputs[i].position(0);
|
|
||||||
outputs[i].limit((int) alignedStripe.range.spanInBlock);
|
|
||||||
decodeInputs[decodeIndices[i]] = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
decoder.decode(decodeInputs, decodeIndices, outputs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* May need online read recovery, zero-copy read doesn't make
|
* May need online read recovery, zero-copy read doesn't make
|
||||||
* sense, so don't support it.
|
* sense, so don't support it.
|
||||||
|
@ -957,12 +519,4 @@ public class DFSStripedInputStream extends DFSInputStream {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Not support enhanced byte buffer access.");
|
"Not support enhanced byte buffer access.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A variation to {@link DFSInputStream#cancelAll} */
|
|
||||||
private void clearFutures(Collection<Future<Void>> futures) {
|
|
||||||
for (Future<Void> future : futures) {
|
|
||||||
future.cancel(false);
|
|
||||||
}
|
|
||||||
futures.clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.configuration.SystemConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}
|
||||||
|
* which may cross multiple stripes with cellSize width.
|
||||||
|
*/
|
||||||
|
class PositionStripeReader extends StripeReader {
|
||||||
|
private ByteBuffer codingBuffer;
|
||||||
|
|
||||||
|
PositionStripeReader(AlignedStripe alignedStripe,
|
||||||
|
ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
|
||||||
|
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
|
||||||
|
RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) {
|
||||||
|
super(alignedStripe, ecPolicy, targetBlocks, readerInfos,
|
||||||
|
corruptedBlocks, decoder, dfsStripedInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void prepareDecodeInputs() {
|
||||||
|
if (codingBuffer == null) {
|
||||||
|
this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
|
||||||
|
initDecodeInputs(alignedStripe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean prepareParityChunk(int index) {
|
||||||
|
Preconditions.checkState(index >= dataBlkNum &&
|
||||||
|
alignedStripe.chunks[index] == null);
|
||||||
|
|
||||||
|
alignedStripe.chunks[index] =
|
||||||
|
new StripingChunk(decodeInputs[index].getBuffer());
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void decode() {
|
||||||
|
finalizeDecodeInputs();
|
||||||
|
decodeAndFillBuffer(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void initDecodeInputs(AlignedStripe alignedStripe) {
|
||||||
|
int bufLen = (int) alignedStripe.getSpanInBlock();
|
||||||
|
int bufCount = dataBlkNum + parityBlkNum;
|
||||||
|
codingBuffer = dfsStripedInputStream.getBufferPool().
|
||||||
|
getBuffer(useDirectBuffer(), bufLen * bufCount);
|
||||||
|
ByteBuffer buffer;
|
||||||
|
for (int i = 0; i < decodeInputs.length; i++) {
|
||||||
|
buffer = codingBuffer.duplicate();
|
||||||
|
decodeInputs[i] = new ECChunk(buffer, i * bufLen, bufLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < dataBlkNum; i++) {
|
||||||
|
if (alignedStripe.chunks[i] == null) {
|
||||||
|
alignedStripe.chunks[i] =
|
||||||
|
new StripingChunk(decodeInputs[i].getBuffer());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void close() {
|
||||||
|
if (decodeInputs != null) {
|
||||||
|
for (int i = 0; i < decodeInputs.length; i++) {
|
||||||
|
decodeInputs[i] = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (codingBuffer != null) {
|
||||||
|
dfsStripedInputStream.getBufferPool().putBuffer(codingBuffer);
|
||||||
|
codingBuffer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,95 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}
|
||||||
|
* which belongs to a single stripe.
|
||||||
|
* Reading cross multiple strips is not supported in this reader.
|
||||||
|
*/
|
||||||
|
class StatefulStripeReader extends StripeReader {
|
||||||
|
|
||||||
|
StatefulStripeReader(AlignedStripe alignedStripe,
|
||||||
|
ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
|
||||||
|
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
|
||||||
|
RawErasureDecoder decoder, DFSStripedInputStream dfsStripedInputStream) {
|
||||||
|
super(alignedStripe, ecPolicy, targetBlocks, readerInfos,
|
||||||
|
corruptedBlocks, decoder, dfsStripedInputStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void prepareDecodeInputs() {
|
||||||
|
final ByteBuffer cur;
|
||||||
|
synchronized (dfsStripedInputStream) {
|
||||||
|
cur = dfsStripedInputStream.getCurStripeBuf().duplicate();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
|
||||||
|
int bufLen = (int) alignedStripe.getSpanInBlock();
|
||||||
|
int bufOff = (int) alignedStripe.getOffsetInBlock();
|
||||||
|
for (int i = 0; i < dataBlkNum; i++) {
|
||||||
|
cur.limit(cur.capacity());
|
||||||
|
int pos = bufOff % cellSize + cellSize * i;
|
||||||
|
cur.position(pos);
|
||||||
|
cur.limit(pos + bufLen);
|
||||||
|
decodeInputs[i] = new ECChunk(cur.slice(), 0, bufLen);
|
||||||
|
if (alignedStripe.chunks[i] == null) {
|
||||||
|
alignedStripe.chunks[i] =
|
||||||
|
new StripingChunk(decodeInputs[i].getBuffer());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean prepareParityChunk(int index) {
|
||||||
|
Preconditions.checkState(index >= dataBlkNum
|
||||||
|
&& alignedStripe.chunks[index] == null);
|
||||||
|
if (readerInfos[index] != null && readerInfos[index].shouldSkip) {
|
||||||
|
alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
|
||||||
|
// we have failed the block reader before
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
final int parityIndex = index - dataBlkNum;
|
||||||
|
ByteBuffer buf = dfsStripedInputStream.getParityBuffer().duplicate();
|
||||||
|
buf.position(cellSize * parityIndex);
|
||||||
|
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
|
||||||
|
decodeInputs[index] =
|
||||||
|
new ECChunk(buf.slice(), 0, (int) alignedStripe.range.spanInBlock);
|
||||||
|
alignedStripe.chunks[index] =
|
||||||
|
new StripingChunk(decodeInputs[index].getBuffer());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void decode() {
|
||||||
|
finalizeDecodeInputs();
|
||||||
|
decodeAndFillBuffer(false);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,463 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECChunk;
|
||||||
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CompletionService;
|
||||||
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}.
|
||||||
|
* Note that an {@link StripedBlockUtil.AlignedStripe} may cross multiple
|
||||||
|
* stripes with cellSize width.
|
||||||
|
*/
|
||||||
|
abstract class StripeReader {
|
||||||
|
|
||||||
|
static class ReaderRetryPolicy {
|
||||||
|
private int fetchEncryptionKeyTimes = 1;
|
||||||
|
private int fetchTokenTimes = 1;
|
||||||
|
|
||||||
|
void refetchEncryptionKey() {
|
||||||
|
fetchEncryptionKeyTimes--;
|
||||||
|
}
|
||||||
|
|
||||||
|
void refetchToken() {
|
||||||
|
fetchTokenTimes--;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean shouldRefetchEncryptionKey() {
|
||||||
|
return fetchEncryptionKeyTimes > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean shouldRefetchToken() {
|
||||||
|
return fetchTokenTimes > 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class BlockReaderInfo {
|
||||||
|
final BlockReader reader;
|
||||||
|
final DatanodeInfo datanode;
|
||||||
|
/**
|
||||||
|
* when initializing block readers, their starting offsets are set to the
|
||||||
|
* same number: the smallest internal block offsets among all the readers.
|
||||||
|
* This is because it is possible that for some internal blocks we have to
|
||||||
|
* read "backwards" for decoding purpose. We thus use this offset array to
|
||||||
|
* track offsets for all the block readers so that we can skip data if
|
||||||
|
* necessary.
|
||||||
|
*/
|
||||||
|
long blockReaderOffset;
|
||||||
|
/**
|
||||||
|
* We use this field to indicate whether we should use this reader. In case
|
||||||
|
* we hit any issue with this reader, we set this field to true and avoid
|
||||||
|
* using it for the next stripe.
|
||||||
|
*/
|
||||||
|
boolean shouldSkip = false;
|
||||||
|
|
||||||
|
BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
|
||||||
|
this.reader = reader;
|
||||||
|
this.datanode = dn;
|
||||||
|
this.blockReaderOffset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setOffset(long offset) {
|
||||||
|
this.blockReaderOffset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
void skip() {
|
||||||
|
this.shouldSkip = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final Map<Future<Void>, Integer> futures = new HashMap<>();
|
||||||
|
protected final AlignedStripe alignedStripe;
|
||||||
|
protected final CompletionService<Void> service;
|
||||||
|
protected final LocatedBlock[] targetBlocks;
|
||||||
|
protected final CorruptedBlocks corruptedBlocks;
|
||||||
|
protected final BlockReaderInfo[] readerInfos;
|
||||||
|
protected final ErasureCodingPolicy ecPolicy;
|
||||||
|
protected final short dataBlkNum;
|
||||||
|
protected final short parityBlkNum;
|
||||||
|
protected final int cellSize;
|
||||||
|
protected final RawErasureDecoder decoder;
|
||||||
|
protected final DFSStripedInputStream dfsStripedInputStream;
|
||||||
|
|
||||||
|
protected ECChunk[] decodeInputs;
|
||||||
|
|
||||||
|
StripeReader(AlignedStripe alignedStripe,
|
||||||
|
ErasureCodingPolicy ecPolicy, LocatedBlock[] targetBlocks,
|
||||||
|
BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks,
|
||||||
|
RawErasureDecoder decoder,
|
||||||
|
DFSStripedInputStream dfsStripedInputStream) {
|
||||||
|
this.alignedStripe = alignedStripe;
|
||||||
|
this.ecPolicy = ecPolicy;
|
||||||
|
this.dataBlkNum = (short)ecPolicy.getNumDataUnits();
|
||||||
|
this.parityBlkNum = (short)ecPolicy.getNumParityUnits();
|
||||||
|
this.cellSize = ecPolicy.getCellSize();
|
||||||
|
this.targetBlocks = targetBlocks;
|
||||||
|
this.readerInfos = readerInfos;
|
||||||
|
this.corruptedBlocks = corruptedBlocks;
|
||||||
|
this.decoder = decoder;
|
||||||
|
this.dfsStripedInputStream = dfsStripedInputStream;
|
||||||
|
|
||||||
|
service = new ExecutorCompletionService<>(
|
||||||
|
dfsStripedInputStream.getStripedReadsThreadPool());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare all the data chunks.
|
||||||
|
*/
|
||||||
|
abstract void prepareDecodeInputs();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare the parity chunk and block reader if necessary.
|
||||||
|
*/
|
||||||
|
abstract boolean prepareParityChunk(int index);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Decode to get the missing data.
|
||||||
|
*/
|
||||||
|
abstract void decode();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Default close do nothing.
|
||||||
|
*/
|
||||||
|
void close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateState4SuccessRead(StripingChunkReadResult result) {
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
result.state == StripingChunkReadResult.SUCCESSFUL);
|
||||||
|
readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
|
||||||
|
+ alignedStripe.getSpanInBlock());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkMissingBlocks() throws IOException {
|
||||||
|
if (alignedStripe.missingChunksNum > parityBlkNum) {
|
||||||
|
clearFutures();
|
||||||
|
throw new IOException(alignedStripe.missingChunksNum
|
||||||
|
+ " missing blocks, the stripe is: " + alignedStripe
|
||||||
|
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We need decoding. Thus go through all the data chunks and make sure we
|
||||||
|
* submit read requests for all of them.
|
||||||
|
*/
|
||||||
|
private void readDataForDecoding() throws IOException {
|
||||||
|
prepareDecodeInputs();
|
||||||
|
for (int i = 0; i < dataBlkNum; i++) {
|
||||||
|
Preconditions.checkNotNull(alignedStripe.chunks[i]);
|
||||||
|
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
|
||||||
|
if (!readChunk(targetBlocks[i], i)) {
|
||||||
|
alignedStripe.missingChunksNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkMissingBlocks();
|
||||||
|
}
|
||||||
|
|
||||||
|
void readParityChunks(int num) throws IOException {
|
||||||
|
for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
|
||||||
|
i++) {
|
||||||
|
if (alignedStripe.chunks[i] == null) {
|
||||||
|
if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
|
||||||
|
j++;
|
||||||
|
} else {
|
||||||
|
alignedStripe.missingChunksNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkMissingBlocks();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
|
||||||
|
if (chunk.useByteBuffer()) {
|
||||||
|
ByteBufferStrategy strategy = new ByteBufferStrategy(
|
||||||
|
chunk.getByteBuffer(), dfsStripedInputStream.getReadStatistics(),
|
||||||
|
dfsStripedInputStream.getDFSClient());
|
||||||
|
return new ByteBufferStrategy[]{strategy};
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBufferStrategy[] strategies =
|
||||||
|
new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
|
||||||
|
for (int i = 0; i < strategies.length; i++) {
|
||||||
|
ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
|
||||||
|
strategies[i] = new ByteBufferStrategy(buffer,
|
||||||
|
dfsStripedInputStream.getReadStatistics(),
|
||||||
|
dfsStripedInputStream.getDFSClient());
|
||||||
|
}
|
||||||
|
return strategies;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int readToBuffer(BlockReader blockReader,
|
||||||
|
DatanodeInfo currentNode, ByteBufferStrategy strategy,
|
||||||
|
ExtendedBlock currentBlock) throws IOException {
|
||||||
|
final int targetLength = strategy.getTargetLength();
|
||||||
|
int length = 0;
|
||||||
|
try {
|
||||||
|
while (length < targetLength) {
|
||||||
|
int ret = strategy.readFromBlock(blockReader);
|
||||||
|
if (ret < 0) {
|
||||||
|
throw new IOException("Unexpected EOS from the reader");
|
||||||
|
}
|
||||||
|
length += ret;
|
||||||
|
}
|
||||||
|
return length;
|
||||||
|
} catch (ChecksumException ce) {
|
||||||
|
DFSClient.LOG.warn("Found Checksum error for "
|
||||||
|
+ currentBlock + " from " + currentNode
|
||||||
|
+ " at " + ce.getPos());
|
||||||
|
// we want to remember which block replicas we have tried
|
||||||
|
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
|
||||||
|
throw ce;
|
||||||
|
} catch (IOException e) {
|
||||||
|
DFSClient.LOG.warn("Exception while reading from "
|
||||||
|
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
|
||||||
|
+ currentNode, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Callable<Void> readCells(final BlockReader reader,
|
||||||
|
final DatanodeInfo datanode, final long currentReaderOffset,
|
||||||
|
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
|
||||||
|
final ExtendedBlock currentBlock) {
|
||||||
|
return () -> {
|
||||||
|
// reader can be null if getBlockReaderWithRetry failed or
|
||||||
|
// the reader hit exception before
|
||||||
|
if (reader == null) {
|
||||||
|
throw new IOException("The BlockReader is null. " +
|
||||||
|
"The BlockReader creation failed or the reader hit exception.");
|
||||||
|
}
|
||||||
|
Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
|
||||||
|
if (currentReaderOffset < targetReaderOffset) {
|
||||||
|
long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
|
||||||
|
Preconditions.checkState(
|
||||||
|
skipped == targetReaderOffset - currentReaderOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ByteBufferStrategy strategy : strategies) {
|
||||||
|
readToBuffer(reader, datanode, strategy, currentBlock);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean readChunk(final LocatedBlock block, int chunkIndex)
|
||||||
|
throws IOException {
|
||||||
|
final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
|
||||||
|
if (block == null) {
|
||||||
|
chunk.state = StripingChunk.MISSING;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (readerInfos[chunkIndex] == null) {
|
||||||
|
if (!dfsStripedInputStream.createBlockReader(block,
|
||||||
|
alignedStripe.getOffsetInBlock(), targetBlocks,
|
||||||
|
readerInfos, chunkIndex)) {
|
||||||
|
chunk.state = StripingChunk.MISSING;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else if (readerInfos[chunkIndex].shouldSkip) {
|
||||||
|
chunk.state = StripingChunk.MISSING;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
chunk.state = StripingChunk.PENDING;
|
||||||
|
Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
|
||||||
|
readerInfos[chunkIndex].datanode,
|
||||||
|
readerInfos[chunkIndex].blockReaderOffset,
|
||||||
|
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
|
||||||
|
block.getBlock());
|
||||||
|
|
||||||
|
Future<Void> request = service.submit(readCallable);
|
||||||
|
futures.put(request, chunkIndex);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* read the whole stripe. do decoding if necessary
|
||||||
|
*/
|
||||||
|
void readStripe() throws IOException {
|
||||||
|
for (int i = 0; i < dataBlkNum; i++) {
|
||||||
|
if (alignedStripe.chunks[i] != null &&
|
||||||
|
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
|
||||||
|
if (!readChunk(targetBlocks[i], i)) {
|
||||||
|
alignedStripe.missingChunksNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// There are missing block locations at this stage. Thus we need to read
|
||||||
|
// the full stripe and one more parity block.
|
||||||
|
if (alignedStripe.missingChunksNum > 0) {
|
||||||
|
checkMissingBlocks();
|
||||||
|
readDataForDecoding();
|
||||||
|
// read parity chunks
|
||||||
|
readParityChunks(alignedStripe.missingChunksNum);
|
||||||
|
}
|
||||||
|
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
|
||||||
|
|
||||||
|
// Input buffers for potential decode operation, which remains null until
|
||||||
|
// first read failure
|
||||||
|
while (!futures.isEmpty()) {
|
||||||
|
try {
|
||||||
|
StripingChunkReadResult r = StripedBlockUtil
|
||||||
|
.getNextCompletedStripedRead(service, futures, 0);
|
||||||
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
|
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
|
||||||
|
+ alignedStripe);
|
||||||
|
}
|
||||||
|
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
|
||||||
|
Preconditions.checkNotNull(returnedChunk);
|
||||||
|
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
|
||||||
|
|
||||||
|
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
|
||||||
|
returnedChunk.state = StripingChunk.FETCHED;
|
||||||
|
alignedStripe.fetchedChunksNum++;
|
||||||
|
updateState4SuccessRead(r);
|
||||||
|
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
|
||||||
|
clearFutures();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
returnedChunk.state = StripingChunk.MISSING;
|
||||||
|
// close the corresponding reader
|
||||||
|
dfsStripedInputStream.closeReader(readerInfos[r.index]);
|
||||||
|
|
||||||
|
final int missing = alignedStripe.missingChunksNum;
|
||||||
|
alignedStripe.missingChunksNum++;
|
||||||
|
checkMissingBlocks();
|
||||||
|
|
||||||
|
readDataForDecoding();
|
||||||
|
readParityChunks(alignedStripe.missingChunksNum - missing);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
String err = "Read request interrupted";
|
||||||
|
DFSClient.LOG.error(err);
|
||||||
|
clearFutures();
|
||||||
|
// Don't decode if read interrupted
|
||||||
|
throw new InterruptedIOException(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (alignedStripe.missingChunksNum > 0) {
|
||||||
|
decode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some fetched {@link StripingChunk} might be stored in original application
|
||||||
|
* buffer instead of prepared decode input buffers. Some others are beyond
|
||||||
|
* the range of the internal blocks and should correspond to all zero bytes.
|
||||||
|
* When all pending requests have returned, this method should be called to
|
||||||
|
* finalize decode input buffers.
|
||||||
|
*/
|
||||||
|
|
||||||
|
void finalizeDecodeInputs() {
|
||||||
|
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||||
|
final StripingChunk chunk = alignedStripe.chunks[i];
|
||||||
|
if (chunk != null && chunk.state == StripingChunk.FETCHED) {
|
||||||
|
if (chunk.useChunkBuffer()) {
|
||||||
|
chunk.getChunkBuffer().copyTo(decodeInputs[i].getBuffer());
|
||||||
|
} else {
|
||||||
|
chunk.getByteBuffer().flip();
|
||||||
|
}
|
||||||
|
} else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
|
||||||
|
decodeInputs[i].setAllZero(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decode based on the given input buffers and erasure coding policy.
|
||||||
|
*/
|
||||||
|
void decodeAndFillBuffer(boolean fillBuffer) {
|
||||||
|
// Step 1: prepare indices and output buffers for missing data units
|
||||||
|
int[] decodeIndices = prepareErasedIndices();
|
||||||
|
|
||||||
|
final int decodeChunkNum = decodeIndices.length;
|
||||||
|
ECChunk[] outputs = new ECChunk[decodeChunkNum];
|
||||||
|
for (int i = 0; i < decodeChunkNum; i++) {
|
||||||
|
outputs[i] = decodeInputs[decodeIndices[i]];
|
||||||
|
decodeInputs[decodeIndices[i]] = null;
|
||||||
|
}
|
||||||
|
// Step 2: decode into prepared output buffers
|
||||||
|
decoder.decode(decodeInputs, decodeIndices, outputs);
|
||||||
|
|
||||||
|
// Step 3: fill original application buffer with decoded data
|
||||||
|
if (fillBuffer) {
|
||||||
|
for (int i = 0; i < decodeIndices.length; i++) {
|
||||||
|
int missingBlkIdx = decodeIndices[i];
|
||||||
|
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
|
||||||
|
if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
|
||||||
|
chunk.getChunkBuffer().copyFrom(outputs[i].getBuffer());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare erased indices.
|
||||||
|
*/
|
||||||
|
int[] prepareErasedIndices() {
|
||||||
|
int[] decodeIndices = new int[parityBlkNum];
|
||||||
|
int pos = 0;
|
||||||
|
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
||||||
|
if (alignedStripe.chunks[i] != null &&
|
||||||
|
alignedStripe.chunks[i].state == StripingChunk.MISSING){
|
||||||
|
decodeIndices[pos++] = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int[] erasedIndices = Arrays.copyOf(decodeIndices, pos);
|
||||||
|
return erasedIndices;
|
||||||
|
}
|
||||||
|
|
||||||
|
void clearFutures() {
|
||||||
|
for (Future<Void> future : futures.keySet()) {
|
||||||
|
future.cancel(false);
|
||||||
|
}
|
||||||
|
futures.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean useDirectBuffer() {
|
||||||
|
return decoder.preferDirectBuffer();
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -32,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -76,18 +75,6 @@ public class StripedBlockUtil {
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(StripedBlockUtil.class);
|
LoggerFactory.getLogger(StripedBlockUtil.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* Parses a striped block group into individual blocks.
|
|
||||||
* @param bg The striped block group
|
|
||||||
* @param ecPolicy The erasure coding policy
|
|
||||||
* @return An array of the blocks in the group
|
|
||||||
*/
|
|
||||||
public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
|
|
||||||
ErasureCodingPolicy ecPolicy) {
|
|
||||||
return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
|
|
||||||
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method parses a striped block group into individual blocks.
|
* This method parses a striped block group into individual blocks.
|
||||||
*
|
*
|
||||||
|
@ -112,7 +99,7 @@ public class StripedBlockUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method creates an internal block at the given index of a block group
|
* This method creates an internal block at the given index of a block group.
|
||||||
*
|
*
|
||||||
* @param idxInReturnedLocs The index in the stored locations in the
|
* @param idxInReturnedLocs The index in the stored locations in the
|
||||||
* {@link LocatedStripedBlock} object
|
* {@link LocatedStripedBlock} object
|
||||||
|
@ -169,7 +156,7 @@ public class StripedBlockUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the size of an internal block at the given index of a block group
|
* Get the size of an internal block at the given index of a block group.
|
||||||
*
|
*
|
||||||
* @param dataSize Size of the block group only counting data blocks
|
* @param dataSize Size of the block group only counting data blocks
|
||||||
* @param cellSize The size of a striping cell
|
* @param cellSize The size of a striping cell
|
||||||
|
@ -237,7 +224,7 @@ public class StripedBlockUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a byte's offset in an internal block, calculate the offset in
|
* Given a byte's offset in an internal block, calculate the offset in
|
||||||
* the block group
|
* the block group.
|
||||||
*/
|
*/
|
||||||
public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
|
public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
|
||||||
long offsetInBlk, int idxInBlockGroup) {
|
long offsetInBlk, int idxInBlockGroup) {
|
||||||
|
@ -248,12 +235,12 @@ public class StripedBlockUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the next completed striped read task
|
* Get the next completed striped read task.
|
||||||
*
|
*
|
||||||
* @return {@link StripingChunkReadResult} indicating the status of the read task
|
* @return {@link StripingChunkReadResult} indicating the status of the read
|
||||||
* succeeded, and the block index of the task. If the method times
|
* task succeeded, and the block index of the task. If the method
|
||||||
* out without getting any completed read tasks, -1 is returned as
|
* times out without getting any completed read tasks, -1 is
|
||||||
* block index.
|
* returned as block index.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public static StripingChunkReadResult getNextCompletedStripedRead(
|
public static StripingChunkReadResult getNextCompletedStripedRead(
|
||||||
|
@ -287,7 +274,7 @@ public class StripedBlockUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the total usage of the striped blocks, which is the total of data
|
* Get the total usage of the striped blocks, which is the total of data
|
||||||
* blocks and parity blocks
|
* blocks and parity blocks.
|
||||||
*
|
*
|
||||||
* @param numDataBlkBytes
|
* @param numDataBlkBytes
|
||||||
* Size of the block group only counting data blocks
|
* Size of the block group only counting data blocks
|
||||||
|
@ -307,91 +294,6 @@ public class StripedBlockUtil {
|
||||||
return numDataBlkBytes + numParityBlkBytes;
|
return numDataBlkBytes + numParityBlkBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize the decoding input buffers based on the chunk states in an
|
|
||||||
* {@link AlignedStripe}. For each chunk that was not initially requested,
|
|
||||||
* schedule a new fetch request with the decoding input buffer as transfer
|
|
||||||
* destination.
|
|
||||||
*/
|
|
||||||
public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe,
|
|
||||||
int dataBlkNum, int parityBlkNum) {
|
|
||||||
ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
|
|
||||||
for (int i = 0; i < decodeInputs.length; i++) {
|
|
||||||
decodeInputs[i] = ByteBuffer.allocate(
|
|
||||||
(int) alignedStripe.getSpanInBlock());
|
|
||||||
}
|
|
||||||
// read the full data aligned stripe
|
|
||||||
for (int i = 0; i < dataBlkNum; i++) {
|
|
||||||
if (alignedStripe.chunks[i] == null) {
|
|
||||||
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return decodeInputs;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Some fetched {@link StripingChunk} might be stored in original application
|
|
||||||
* buffer instead of prepared decode input buffers. Some others are beyond
|
|
||||||
* the range of the internal blocks and should correspond to all zero bytes.
|
|
||||||
* When all pending requests have returned, this method should be called to
|
|
||||||
* finalize decode input buffers.
|
|
||||||
*/
|
|
||||||
public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs,
|
|
||||||
AlignedStripe alignedStripe) {
|
|
||||||
for (int i = 0; i < alignedStripe.chunks.length; i++) {
|
|
||||||
final StripingChunk chunk = alignedStripe.chunks[i];
|
|
||||||
if (chunk != null && chunk.state == StripingChunk.FETCHED) {
|
|
||||||
if (chunk.useChunkBuffer()) {
|
|
||||||
chunk.getChunkBuffer().copyTo(decodeInputs[i]);
|
|
||||||
} else {
|
|
||||||
chunk.getByteBuffer().flip();
|
|
||||||
}
|
|
||||||
} else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
|
|
||||||
//ZERO it. Will be better handled in other following issue.
|
|
||||||
byte[] emptyBytes = new byte[decodeInputs[i].limit()];
|
|
||||||
decodeInputs[i].put(emptyBytes);
|
|
||||||
decodeInputs[i].flip();
|
|
||||||
} else {
|
|
||||||
decodeInputs[i] = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decode based on the given input buffers and erasure coding policy.
|
|
||||||
*/
|
|
||||||
public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs,
|
|
||||||
AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
|
|
||||||
RawErasureDecoder decoder) {
|
|
||||||
// Step 1: prepare indices and output buffers for missing data units
|
|
||||||
int[] decodeIndices = new int[parityBlkNum];
|
|
||||||
int pos = 0;
|
|
||||||
for (int i = 0; i < dataBlkNum; i++) {
|
|
||||||
if (alignedStripe.chunks[i] != null &&
|
|
||||||
alignedStripe.chunks[i].state == StripingChunk.MISSING){
|
|
||||||
decodeIndices[pos++] = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
decodeIndices = Arrays.copyOf(decodeIndices, pos);
|
|
||||||
ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length];
|
|
||||||
for (int i = 0; i < decodeOutputs.length; i++) {
|
|
||||||
decodeOutputs[i] = ByteBuffer.allocate(
|
|
||||||
(int) alignedStripe.getSpanInBlock());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Step 2: decode into prepared output buffers
|
|
||||||
decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
|
|
||||||
|
|
||||||
// Step 3: fill original application buffer with decoded data
|
|
||||||
for (int i = 0; i < decodeIndices.length; i++) {
|
|
||||||
int missingBlkIdx = decodeIndices[i];
|
|
||||||
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
|
|
||||||
if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
|
|
||||||
chunk.getChunkBuffer().copyFrom(decodeOutputs[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Similar functionality with {@link #divideByteRangeIntoStripes}, but is used
|
* Similar functionality with {@link #divideByteRangeIntoStripes}, but is used
|
||||||
* by stateful read and uses ByteBuffer as reading target buffer. Besides the
|
* by stateful read and uses ByteBuffer as reading target buffer. Besides the
|
||||||
|
@ -485,7 +387,7 @@ public class StripedBlockUtil {
|
||||||
/**
|
/**
|
||||||
* Map the logical byte range to a set of inclusive {@link StripingCell}
|
* Map the logical byte range to a set of inclusive {@link StripingCell}
|
||||||
* instances, each representing the overlap of the byte range to a cell
|
* instances, each representing the overlap of the byte range to a cell
|
||||||
* used by {@link DFSStripedOutputStream} in encoding
|
* used by {@link DFSStripedOutputStream} in encoding.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
private static StripingCell[] getStripingCellsOfByteRange(
|
private static StripingCell[] getStripingCellsOfByteRange(
|
||||||
|
@ -530,7 +432,7 @@ public class StripedBlockUtil {
|
||||||
int dataBlkNum = ecPolicy.getNumDataUnits();
|
int dataBlkNum = ecPolicy.getNumDataUnits();
|
||||||
int parityBlkNum = ecPolicy.getNumParityUnits();
|
int parityBlkNum = ecPolicy.getNumParityUnits();
|
||||||
|
|
||||||
VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum];
|
VerticalRange[] ranges = new VerticalRange[dataBlkNum + parityBlkNum];
|
||||||
|
|
||||||
long earliestStart = Long.MAX_VALUE;
|
long earliestStart = Long.MAX_VALUE;
|
||||||
long latestEnd = -1;
|
long latestEnd = -1;
|
||||||
|
@ -675,7 +577,7 @@ public class StripedBlockUtil {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static class StripingCell {
|
static class StripingCell {
|
||||||
final ErasureCodingPolicy ecPolicy;
|
final ErasureCodingPolicy ecPolicy;
|
||||||
/** Logical order in a block group, used when doing I/O to a block group */
|
/** Logical order in a block group, used when doing I/O to a block group. */
|
||||||
final int idxInBlkGroup;
|
final int idxInBlkGroup;
|
||||||
final int idxInInternalBlk;
|
final int idxInInternalBlk;
|
||||||
final int idxInStripe;
|
final int idxInStripe;
|
||||||
|
@ -738,7 +640,7 @@ public class StripedBlockUtil {
|
||||||
*/
|
*/
|
||||||
public static class AlignedStripe {
|
public static class AlignedStripe {
|
||||||
public VerticalRange range;
|
public VerticalRange range;
|
||||||
/** status of each chunk in the stripe */
|
/** status of each chunk in the stripe. */
|
||||||
public final StripingChunk[] chunks;
|
public final StripingChunk[] chunks;
|
||||||
public int fetchedChunksNum = 0;
|
public int fetchedChunksNum = 0;
|
||||||
public int missingChunksNum = 0;
|
public int missingChunksNum = 0;
|
||||||
|
@ -790,9 +692,9 @@ public class StripedBlockUtil {
|
||||||
* +-----+
|
* +-----+
|
||||||
*/
|
*/
|
||||||
public static class VerticalRange {
|
public static class VerticalRange {
|
||||||
/** start offset in the block group (inclusive) */
|
/** start offset in the block group (inclusive). */
|
||||||
public long offsetInBlock;
|
public long offsetInBlock;
|
||||||
/** length of the stripe range */
|
/** length of the stripe range. */
|
||||||
public long spanInBlock;
|
public long spanInBlock;
|
||||||
|
|
||||||
public VerticalRange(long offsetInBlock, long length) {
|
public VerticalRange(long offsetInBlock, long length) {
|
||||||
|
@ -801,7 +703,7 @@ public class StripedBlockUtil {
|
||||||
this.spanInBlock = length;
|
this.spanInBlock = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** whether a position is in the range */
|
/** whether a position is in the range. */
|
||||||
public boolean include(long pos) {
|
public boolean include(long pos) {
|
||||||
return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
|
return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
|
||||||
}
|
}
|
||||||
|
@ -915,7 +817,7 @@ public class StripedBlockUtil {
|
||||||
/**
|
/**
|
||||||
* Note: target will be ready-to-read state after the call.
|
* Note: target will be ready-to-read state after the call.
|
||||||
*/
|
*/
|
||||||
void copyTo(ByteBuffer target) {
|
public void copyTo(ByteBuffer target) {
|
||||||
for (ByteBuffer slice : slices) {
|
for (ByteBuffer slice : slices) {
|
||||||
slice.flip();
|
slice.flip();
|
||||||
target.put(slice);
|
target.put(slice);
|
||||||
|
@ -923,7 +825,7 @@ public class StripedBlockUtil {
|
||||||
target.flip();
|
target.flip();
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyFrom(ByteBuffer src) {
|
public void copyFrom(ByteBuffer src) {
|
||||||
ByteBuffer tmp;
|
ByteBuffer tmp;
|
||||||
int len;
|
int len;
|
||||||
for (ByteBuffer slice : slices) {
|
for (ByteBuffer slice : slices) {
|
||||||
|
@ -970,6 +872,28 @@ public class StripedBlockUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Used to indicate the buffered data's range in the block group. */
|
||||||
|
public static class StripeRange {
|
||||||
|
/** start offset in the block group (inclusive). */
|
||||||
|
final long offsetInBlock;
|
||||||
|
/** length of the stripe range. */
|
||||||
|
final long length;
|
||||||
|
|
||||||
|
public StripeRange(long offsetInBlock, long length) {
|
||||||
|
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
|
||||||
|
this.offsetInBlock = offsetInBlock;
|
||||||
|
this.length = length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean include(long pos) {
|
||||||
|
return pos >= offsetInBlock && pos < offsetInBlock + length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLength() {
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the information such as IDs and generation stamps in block-i
|
* Check if the information such as IDs and generation stamps in block-i
|
||||||
* match the block group.
|
* match the block group.
|
||||||
|
|
|
@ -283,5 +283,4 @@ public class TestStripedBlockUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue