HDFS-8901. Use ByteBuffer in striping positional read. Contributed by Sammi Chen and Kai Zheng.

This commit is contained in:
Zhe Zhang 2016-09-08 11:54:33 -07:00
parent 20a20c2f6e
commit 401db4fc65
6 changed files with 285 additions and 137 deletions

View File

@ -304,7 +304,7 @@ public class DataChecksum implements Checksum {
bytesPerChecksum, checksums.array(), crcsOffset, fileName, basePos);
return;
}
if (NativeCrc32.isAvailable()) {
if (NativeCrc32.isAvailable() && data.isDirect()) {
NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
fileName, basePos);
} else {

View File

@ -533,7 +533,8 @@ public class DFSInputStream extends FSInputStream
* Open a DataInputStream to a DataNode so that it can be read from.
* We get block ID and the IDs of the destinations at startup, from the namenode.
*/
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
private synchronized DatanodeInfo blockSeekTo(long target)
throws IOException {
if (target >= getFileLength()) {
throw new IOException("Attempted to read past end of file");
}
@ -962,14 +963,14 @@ public class DFSInputStream extends FSInputStream
}
protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
block = refreshLocatedBlock(block);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
try {
actualGetFromOneDataNode(addressPair, block, start, end,
buf, offset, corruptedBlocks);
buf, corruptedBlocks);
return;
} catch (IOException e) {
checkInterrupted(e); // check if the read has been interrupted
@ -988,12 +989,10 @@ public class DFSInputStream extends FSInputStream
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
byte[] buf = bb.array();
int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, block, start, end, buf,
offset, corruptedBlocks);
actualGetFromOneDataNode(datanode, block, start, end, bb,
corruptedBlocks);
return bb;
}
}
@ -1007,13 +1006,12 @@ public class DFSInputStream extends FSInputStream
* @param block the located block containing the requested data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read
* @param offset the offset in buf
* @param buf the given byte buffer into which the data is read
* @param corruptedBlocks map recording list of datanodes with corrupted
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block,
final long startInBlk, final long endInBlk, byte[] buf, int offset,
final long startInBlk, final long endInBlk, ByteBuffer buf,
CorruptedBlocks corruptedBlocks)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
@ -1031,7 +1029,22 @@ public class DFSInputStream extends FSInputStream
DFSClientFaultInjector.get().fetchFromDatanodeException();
reader = getBlockReader(block, startInBlk, len, datanode.addr,
datanode.storageType, datanode.info);
int nread = reader.readAll(buf, offset, len);
//Behave exactly as the readAll() call
ByteBuffer tmp = buf.duplicate();
tmp.limit(tmp.position() + len);
tmp = tmp.slice();
int nread = 0;
int ret;
while (true) {
ret = reader.read(tmp);
if (ret <= 0) {
break;
}
nread += ret;
}
buf.position(buf.position() + nread);
IOUtilsClient.updateReadStatistics(readStatistics, nread, reader);
dfsClient.updateFileSystemReadStats(
reader.getNetworkDistance(), nread);
@ -1098,7 +1111,7 @@ public class DFSInputStream extends FSInputStream
* time. We then wait on which ever read returns first.
*/
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
@ -1130,8 +1143,8 @@ public class DFSInputStream extends FSInputStream
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) {
ByteBuffer result = future.get();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
result.flip();
buf.put(result);
return;
}
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@ -1173,8 +1186,8 @@ public class DFSInputStream extends FSInputStream
// cancel the rest.
cancelAll(futures);
dfsClient.getHedgedReadMetrics().incHedgedReadWins();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
result.flip();
buf.put(result);
return;
} catch (InterruptedException ie) {
// Ignore and retry
@ -1244,7 +1257,8 @@ public class DFSInputStream extends FSInputStream
* access key from its memory since it's considered expired based on
* the estimated expiration date.
*/
if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
if (ex instanceof InvalidBlockTokenException ||
ex instanceof InvalidToken) {
DFSClient.LOG.info("Access token was invalid when connecting to "
+ targetAddr + " : " + ex);
return true;
@ -1272,7 +1286,8 @@ public class DFSInputStream extends FSInputStream
try (TraceScope scope = dfsClient.
newReaderTraceScope("DFSInputStream#byteArrayPread",
src, position, length)) {
int retLen = pread(position, buffer, offset, length);
ByteBuffer bb = ByteBuffer.wrap(buffer, offset, length);
int retLen = pread(position, bb);
if (retLen < length) {
dfsClient.addRetLenToReaderScope(scope, retLen);
}
@ -1280,7 +1295,7 @@ public class DFSInputStream extends FSInputStream
}
}
private int pread(long position, byte[] buffer, int offset, int length)
private int pread(long position, ByteBuffer buffer)
throws IOException {
// sanity checks
dfsClient.checkOpen();
@ -1292,6 +1307,7 @@ public class DFSInputStream extends FSInputStream
if ((position < 0) || (position >= filelen)) {
return -1;
}
int length = buffer.remaining();
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
@ -1304,14 +1320,16 @@ public class DFSInputStream extends FSInputStream
CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
int bytesToRead = (int) Math.min(remaining,
blk.getBlockSize() - targetStart);
long targetEnd = targetStart + bytesToRead - 1;
try {
if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
hedgedFetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset, corruptedBlocks);
targetEnd, buffer, corruptedBlocks);
} else {
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
buffer, offset, corruptedBlocks);
fetchBlockByteRange(blk, targetStart, targetEnd,
buffer, corruptedBlocks);
}
} finally {
// Check and report if any block replicas are corrupted.
@ -1323,7 +1341,6 @@ public class DFSInputStream extends FSInputStream
remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
return realLen;
@ -1457,7 +1474,8 @@ public class DFSInputStream extends FSInputStream
* If another node could not be found, then returns false.
*/
@Override
public synchronized boolean seekToNewSource(long targetPos) throws IOException {
public synchronized boolean seekToNewSource(long targetPos)
throws IOException {
if (currentNode == null) {
return seekToBlockSource(targetPos);
}

View File

@ -307,8 +307,8 @@ public class DFSStripedInputStream extends DFSInputStream {
stripeLimit - stripeBufOffset);
LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize,
blockGroup, offsetInBlockGroup,
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy,
cellSize, blockGroup, offsetInBlockGroup,
offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum);
@ -523,13 +523,13 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
@Override
protected void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
ecPolicy, cellSize, blockGroup, start, end, buf, offset);
ecPolicy, cellSize, blockGroup, start, end, buf);
CompletionService<Void> readService = new ExecutorCompletionService<>(
dfsClient.getStripedReadsThreadPool());
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
@ -542,6 +542,7 @@ public class DFSStripedInputStream extends DFSInputStream {
blks, preaderInfos, corruptedBlocks);
preader.readStripe();
}
buf.position(buf.position() + (int)(end - start + 1));
} finally {
for (BlockReaderInfo preaderInfo : preaderInfos) {
closeReader(preaderInfo);
@ -698,16 +699,15 @@ public class DFSStripedInputStream extends DFSInputStream {
}
private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
if (chunk.byteBuffer != null) {
ByteBufferStrategy strategy =
new ByteBufferStrategy(chunk.byteBuffer, readStatistics, dfsClient);
if (chunk.useByteBuffer()) {
ByteBufferStrategy strategy = new ByteBufferStrategy(
chunk.getByteBuffer(), readStatistics, dfsClient);
return new ByteBufferStrategy[]{strategy};
} else {
ByteBufferStrategy[] strategies =
new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
for (int i = 0; i < strategies.length; i++) {
ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
strategies[i] =
new ByteBufferStrategy(buffer, readStatistics, dfsClient);
}
@ -814,7 +814,7 @@ public class DFSStripedInputStream extends DFSInputStream {
}
class PositionStripeReader extends StripeReader {
private byte[][] decodeInputs = null;
private ByteBuffer[] decodeInputs = null;
PositionStripeReader(CompletionService<Void> service,
AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
@ -836,8 +836,6 @@ public class DFSStripedInputStream extends DFSInputStream {
Preconditions.checkState(index >= dataBlkNum &&
alignedStripe.chunks[index] == null);
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
alignedStripe.chunks[index].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock());
return true;
}

View File

@ -73,7 +73,8 @@ import java.util.concurrent.TimeUnit;
@InterfaceAudience.Private
public class StripedBlockUtil {
public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
public static final Logger LOG =
LoggerFactory.getLogger(StripedBlockUtil.class);
/**
* Parses a striped block group into individual blocks.
@ -312,16 +313,17 @@ public class StripedBlockUtil {
* schedule a new fetch request with the decoding input buffer as transfer
* destination.
*/
public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
public static ByteBuffer[] initDecodeInputs(AlignedStripe alignedStripe,
int dataBlkNum, int parityBlkNum) {
byte[][] decodeInputs =
new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
ByteBuffer[] decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
for (int i = 0; i < decodeInputs.length; i++) {
decodeInputs[i] = ByteBuffer.allocate(
(int) alignedStripe.getSpanInBlock());
}
// read the full data aligned stripe
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
alignedStripe.chunks[i].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock());
}
}
return decodeInputs;
@ -334,14 +336,21 @@ public class StripedBlockUtil {
* When all pending requests have returned, this method should be called to
* finalize decode input buffers.
*/
public static void finalizeDecodeInputs(final byte[][] decodeInputs,
public static void finalizeDecodeInputs(final ByteBuffer[] decodeInputs,
AlignedStripe alignedStripe) {
for (int i = 0; i < alignedStripe.chunks.length; i++) {
final StripingChunk chunk = alignedStripe.chunks[i];
if (chunk != null && chunk.state == StripingChunk.FETCHED) {
chunk.copyTo(decodeInputs[i]);
if (chunk.useChunkBuffer()) {
chunk.getChunkBuffer().copyTo(decodeInputs[i]);
} else {
chunk.getByteBuffer().flip();
}
} else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
Arrays.fill(decodeInputs[i], (byte) 0);
//ZERO it. Will be better handled in other following issue.
byte[] emptyBytes = new byte[decodeInputs[i].limit()];
decodeInputs[i].put(emptyBytes);
decodeInputs[i].flip();
} else {
decodeInputs[i] = null;
}
@ -351,7 +360,7 @@ public class StripedBlockUtil {
/**
* Decode based on the given input buffers and erasure coding policy.
*/
public static void decodeAndFillBuffer(final byte[][] decodeInputs,
public static void decodeAndFillBuffer(final ByteBuffer[] decodeInputs,
AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
RawErasureDecoder decoder) {
// Step 1: prepare indices and output buffers for missing data units
@ -364,8 +373,11 @@ public class StripedBlockUtil {
}
}
decodeIndices = Arrays.copyOf(decodeIndices, pos);
byte[][] decodeOutputs =
new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()];
ByteBuffer[] decodeOutputs = new ByteBuffer[decodeIndices.length];
for (int i = 0; i < decodeOutputs.length; i++) {
decodeOutputs[i] = ByteBuffer.allocate(
(int) alignedStripe.getSpanInBlock());
}
// Step 2: decode into prepared output buffers
decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
@ -374,8 +386,8 @@ public class StripedBlockUtil {
for (int i = 0; i < decodeIndices.length; i++) {
int missingBlkIdx = decodeIndices[i];
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
if (chunk.state == StripingChunk.MISSING) {
chunk.copyFrom(decodeOutputs[i]);
if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
chunk.getChunkBuffer().copyFrom(decodeOutputs[i]);
}
}
}
@ -402,7 +414,8 @@ public class StripedBlockUtil {
// Step 4: calculate each chunk's position in destination buffer. Since the
// whole read range is within a single stripe, the logic is simpler here.
int bufOffset = (int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum));
int bufOffset =
(int) (rangeStartInBlockGroup % ((long) cellSize * dataBlkNum));
for (StripingCell cell : cells) {
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
long cellEnd = cellStart + cell.size - 1;
@ -437,15 +450,14 @@ public class StripedBlockUtil {
* @param rangeStartInBlockGroup The byte range's start offset in block group
* @param rangeEndInBlockGroup The byte range's end offset in block group
* @param buf Destination buffer of the read operation for the byte range
* @param offsetInBuf Start offset into the destination buffer
*
* At most 5 stripes will be generated from each logical range, as
* demonstrated in the header of {@link AlignedStripe}.
*/
public static AlignedStripe[] divideByteRangeIntoStripes(ErasureCodingPolicy ecPolicy,
public static AlignedStripe[] divideByteRangeIntoStripes(
ErasureCodingPolicy ecPolicy,
int cellSize, LocatedStripedBlock blockGroup,
long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
int offsetInBuf) {
long rangeStartInBlockGroup, long rangeEndInBlockGroup, ByteBuffer buf) {
// Step 0: analyze range and calculate basic parameters
final int dataBlkNum = ecPolicy.getNumDataUnits();
@ -462,7 +474,7 @@ public class StripedBlockUtil {
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
// Step 4: calculate each chunk's position in destination buffer
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
// Step 5: prepare ALLZERO blocks
prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
@ -476,7 +488,8 @@ public class StripedBlockUtil {
* used by {@link DFSStripedOutputStream} in encoding
*/
@VisibleForTesting
private static StripingCell[] getStripingCellsOfByteRange(ErasureCodingPolicy ecPolicy,
private static StripingCell[] getStripingCellsOfByteRange(
ErasureCodingPolicy ecPolicy,
int cellSize, LocatedStripedBlock blockGroup,
long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
Preconditions.checkArgument(
@ -511,7 +524,8 @@ public class StripedBlockUtil {
* the physical byte range (inclusive) on each stored internal block.
*/
@VisibleForTesting
private static VerticalRange[] getRangesForInternalBlocks(ErasureCodingPolicy ecPolicy,
private static VerticalRange[] getRangesForInternalBlocks(
ErasureCodingPolicy ecPolicy,
int cellSize, StripingCell[] cells) {
int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNum = ecPolicy.getNumParityUnits();
@ -575,8 +589,7 @@ public class StripedBlockUtil {
}
private static void calcualteChunkPositionsInBuf(int cellSize,
AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
int offsetInBuf) {
AlignedStripe[] stripes, StripingCell[] cells, ByteBuffer buf) {
/**
* | <--------------- AlignedStripe --------------->|
*
@ -598,6 +611,7 @@ public class StripedBlockUtil {
for (StripingCell cell : cells) {
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
long cellEnd = cellStart + cell.size - 1;
StripingChunk chunk;
for (AlignedStripe s : stripes) {
long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
@ -606,11 +620,13 @@ public class StripedBlockUtil {
if (overLapLen <= 0) {
continue;
}
if (s.chunks[cell.idxInStripe] == null) {
s.chunks[cell.idxInStripe] = new StripingChunk(buf);
chunk = s.chunks[cell.idxInStripe];
if (chunk == null) {
chunk = new StripingChunk();
s.chunks[cell.idxInStripe] = chunk;
}
s.chunks[cell.idxInStripe].addByteArraySlice(
(int)(offsetInBuf + done + overlapStart - cellStart), overLapLen);
chunk.getChunkBuffer().addSlice(buf,
(int) (done + overlapStart - cellStart), overLapLen);
}
done += cell.size;
}
@ -833,88 +849,89 @@ public class StripedBlockUtil {
*/
public int state = REQUESTED;
public final ChunkByteArray byteArray;
public final ByteBuffer byteBuffer;
private final ChunkByteBuffer chunkBuffer;
private final ByteBuffer byteBuffer;
public StripingChunk(byte[] buf) {
this.byteArray = new ChunkByteArray(buf);
public StripingChunk() {
this.chunkBuffer = new ChunkByteBuffer();
byteBuffer = null;
}
public StripingChunk(ByteBuffer buf) {
this.byteArray = null;
this.chunkBuffer = null;
this.byteBuffer = buf;
}
public StripingChunk(int state) {
this.byteArray = null;
this.chunkBuffer = null;
this.byteBuffer = null;
this.state = state;
}
public void addByteArraySlice(int offset, int length) {
assert byteArray != null;
byteArray.offsetsInBuf.add(offset);
byteArray.lengthsInBuf.add(length);
public boolean useByteBuffer(){
return byteBuffer != null;
}
void copyTo(byte[] target) {
assert byteArray != null;
byteArray.copyTo(target);
public boolean useChunkBuffer() {
return chunkBuffer != null;
}
void copyFrom(byte[] src) {
assert byteArray != null;
byteArray.copyFrom(src);
public ByteBuffer getByteBuffer() {
assert byteBuffer != null;
return byteBuffer;
}
public ChunkByteBuffer getChunkBuffer() {
assert chunkBuffer != null;
return chunkBuffer;
}
}
public static class ChunkByteArray {
private final byte[] buf;
private final List<Integer> offsetsInBuf;
private final List<Integer> lengthsInBuf;
/**
* A utility to manage ByteBuffer slices for a reader.
*/
public static class ChunkByteBuffer {
private final List<ByteBuffer> slices;
ChunkByteArray(byte[] buf) {
this.buf = buf;
this.offsetsInBuf = new ArrayList<>();
this.lengthsInBuf = new ArrayList<>();
ChunkByteBuffer() {
this.slices = new ArrayList<>();
}
public int[] getOffsets() {
int[] offsets = new int[offsetsInBuf.size()];
for (int i = 0; i < offsets.length; i++) {
offsets[i] = offsetsInBuf.get(i);
public void addSlice(ByteBuffer buffer, int offset, int len) {
ByteBuffer tmp = buffer.duplicate();
tmp.position(buffer.position() + offset);
tmp.limit(buffer.position() + offset + len);
slices.add(tmp.slice());
}
public ByteBuffer getSlice(int i) {
return slices.get(i);
}
public List<ByteBuffer> getSlices() {
return slices;
}
/**
* Note: target will be ready-to-read state after the call.
*/
void copyTo(ByteBuffer target) {
for (ByteBuffer slice : slices) {
slice.flip();
target.put(slice);
}
return offsets;
target.flip();
}
public int[] getLengths() {
int[] lens = new int[this.lengthsInBuf.size()];
for (int i = 0; i < lens.length; i++) {
lens[i] = this.lengthsInBuf.get(i);
}
return lens;
}
public byte[] buf() {
return buf;
}
void copyTo(byte[] target) {
int posInBuf = 0;
for (int i = 0; i < offsetsInBuf.size(); i++) {
System.arraycopy(buf, offsetsInBuf.get(i),
target, posInBuf, lengthsInBuf.get(i));
posInBuf += lengthsInBuf.get(i);
}
}
void copyFrom(byte[] src) {
int srcPos = 0;
for (int j = 0; j < offsetsInBuf.size(); j++) {
System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j),
lengthsInBuf.get(j));
srcPos += lengthsInBuf.get(j);
void copyFrom(ByteBuffer src) {
ByteBuffer tmp;
int len;
for (ByteBuffer slice : slices) {
len = slice.remaining();
tmp = src.duplicate();
tmp.limit(tmp.position() + len);
slice.put(tmp);
src.position(src.position() + len);
}
}
}

View File

@ -57,7 +57,8 @@ import static org.junit.Assert.assertTrue;
public class TestDFSStripedInputStream {
public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class);
public static final Log LOG =
LogFactory.getLog(TestDFSStripedInputStream.class);
private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
@ -272,12 +273,16 @@ public class TestDFSStripedInputStream {
// |10 |
done += in.read(0, readBuffer, 0, delta);
assertEquals(delta, done);
assertArrayEquals(Arrays.copyOf(expected, done),
Arrays.copyOf(readBuffer, done));
// both head and trail cells are partial
// |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
// |256K - 10|missing|256K|256K|256K - 10|not in range|
done += in.read(delta, readBuffer, delta,
CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
assertArrayEquals(Arrays.copyOf(expected, done),
Arrays.copyOf(readBuffer, done));
// read the rest
done += in.read(done, readBuffer, done, readSize - done);
assertEquals(readSize, done);
@ -291,8 +296,8 @@ public class TestDFSStripedInputStream {
testStatefulRead(true, true);
}
private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
throws Exception {
private void testStatefulRead(boolean useByteBuffer,
boolean cellMisalignPacket) throws Exception {
final int numBlocks = 2;
final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
if (cellMisalignPacket) {
@ -302,7 +307,8 @@ public class TestDFSStripedInputStream {
}
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize);
LocatedBlocks lbs = fs.getClient().namenode.
getBlockLocations(filePath.toString(), 0, fileSize);
assert lbs.getLocatedBlocks().size() == numBlocks;
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
@ -360,4 +366,111 @@ public class TestDFSStripedInputStream {
}
fs.delete(filePath, true);
}
@Test
public void testStatefulReadWithDNFailure() throws Exception {
final int numBlocks = 4;
final int failedDNIdx = DATA_BLK_NUM - 1;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCK_GROUP_SIZE);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
NUM_STRIPE_PER_BLOCK * CELLSIZE,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
ecPolicy, null);
int readSize = BLOCK_GROUP_SIZE;
byte[] readBuffer = new byte[readSize];
byte[] expected = new byte[readSize];
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
for (int j = 0; j < DATA_BLK_NUM; j++) {
for (int k = 0; k < CELLSIZE; k++) {
int posInBlk = i * CELLSIZE + k;
int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
expected[posInFile] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + j), posInBlk);
}
}
}
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
DATA_BLK_NUM, PARITY_BLK_NUM);
RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(conf,
ecPolicy.getCodecName(), coderOptions);
// Update the expected content for decoded data
int[] missingBlkIdx = new int[PARITY_BLK_NUM];
for (int i = 0; i < missingBlkIdx.length; i++) {
if (i == 0) {
missingBlkIdx[i] = failedDNIdx;
} else {
missingBlkIdx[i] = DATA_BLK_NUM + i;
}
}
cluster.stopDataNode(failedDNIdx);
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
byte[][] decodeOutputs = new byte[missingBlkIdx.length][CELLSIZE];
for (int j = 0; j < DATA_BLK_NUM; j++) {
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
if (j != failedDNIdx) {
System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
}
}
for (int j = DATA_BLK_NUM; j < DATA_BLK_NUM + PARITY_BLK_NUM; j++) {
for (int k = 0; k < CELLSIZE; k++) {
int posInBlk = i * CELLSIZE + k;
decodeInputs[j][k] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + j), posInBlk);
}
}
for (int m : missingBlkIdx) {
decodeInputs[m] = null;
}
rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
}
int delta = 10;
int done = 0;
// read a small delta, shouldn't trigger decode
// |cell_0 |
// |10 |
done += in.read(readBuffer, 0, delta);
assertEquals(delta, done);
// both head and trail cells are partial
// |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
// |256K - 10|missing|256K|256K|256K - 10|not in range|
while (done < (CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta)) {
int ret = in.read(readBuffer, delta,
CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
assertTrue(ret > 0);
done += ret;
}
assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
// read the rest
int restSize;
restSize = readSize - done;
while (done < restSize) {
int ret = in.read(readBuffer, done, restSize);
assertTrue(ret > 0);
done += ret;
}
assertEquals(readSize, done);
assertArrayEquals(expected, readBuffer);
}
}

View File

@ -36,6 +36,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.nio.ByteBuffer;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@ -242,7 +243,8 @@ public class TestStripedBlockUtil {
*/
@Test
public void testDivideByteRangeIntoStripes() {
byte[] assembled = new byte[BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE];
ByteBuffer assembled =
ByteBuffer.allocate(BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE);
for (int bgSize : blockGroupSizes) {
LocatedStripedBlock blockGroup = createDummyLocatedBlock(bgSize);
byte[][] internalBlkBufs = createInternalBlkBuffers(bgSize);
@ -252,7 +254,7 @@ public class TestStripedBlockUtil {
continue;
}
AlignedStripe[] stripes = divideByteRangeIntoStripes(EC_POLICY,
CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled, 0);
CELLSIZE, blockGroup, brStart, brStart + brSize - 1, assembled);
for (AlignedStripe stripe : stripes) {
for (int i = 0; i < DATA_BLK_NUM; i++) {
@ -261,21 +263,21 @@ public class TestStripedBlockUtil {
continue;
}
int done = 0;
for (int j = 0; j < chunk.byteArray.getLengths().length; j++) {
System.arraycopy(internalBlkBufs[i],
(int) stripe.getOffsetInBlock() + done, assembled,
chunk.byteArray.getOffsets()[j],
chunk.byteArray.getLengths()[j]);
done += chunk.byteArray.getLengths()[j];
int len;
for (ByteBuffer slice : chunk.getChunkBuffer().getSlices()) {
len = slice.remaining();
slice.put(internalBlkBufs[i],
(int) stripe.getOffsetInBlock() + done, len);
done += len;
}
}
}
for (int i = 0; i < brSize; i++) {
if (hashIntToByte(brStart + i) != assembled[i]) {
if (hashIntToByte(brStart + i) != assembled.get(i)) {
System.out.println("Oops");
}
assertEquals("Byte at " + (brStart + i) + " should be the same",
hashIntToByte(brStart + i), assembled[i]);
hashIntToByte(brStart + i), assembled.get(i));
}
}
}