HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil. Contributed by Zhe Zhang.

This commit is contained in:
Zhe Zhang 2015-04-29 23:49:52 -07:00 committed by Zhe Zhang
parent b00c663877
commit f0628280c3
4 changed files with 186 additions and 113 deletions

View File

@ -146,3 +146,6 @@
HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream
(stateful read). (Jing Zhao via Zhe Zhang)
HDFS-8282. Erasure coding: move striped reading logic to StripedBlockUtil.
(Zhe Zhang)

View File

@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hdfs;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@ -31,8 +33,6 @@ import org.apache.htrace.TraceScope;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
@ -69,59 +69,6 @@ import java.util.concurrent.Future;
* 3. pread with decode support: TODO: will be supported after HDFS-7678
*****************************************************************************/
public class DFSStripedInputStream extends DFSInputStream {
/**
* This method plans the read portion from each block in the stripe
* @param dataBlkNum The number of data blocks in the striping group
* @param cellSize The size of each striping cell
* @param startInBlk Starting offset in the striped block
* @param len Length of the read request
* @param bufOffset Initial offset in the result buffer
* @return array of {@link ReadPortion}, each representing the portion of I/O
* for an individual block in the group
*/
@VisibleForTesting
static ReadPortion[] planReadPortions(final int dataBlkNum,
final int cellSize, final long startInBlk, final int len, int bufOffset) {
ReadPortion[] results = new ReadPortion[dataBlkNum];
for (int i = 0; i < dataBlkNum; i++) {
results[i] = new ReadPortion();
}
// cellIdxInBlk is the index of the cell in the block
// E.g., cell_3 is the 2nd cell in blk_0
int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
// blkIdxInGroup is the index of the block in the striped block group
// E.g., blk_2 is the 3rd block in the group
final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
startInBlk % cellSize;
boolean crossStripe = false;
for (int i = 1; i < dataBlkNum; i++) {
if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
cellIdxInBlk++;
crossStripe = true;
}
results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
cellSize * cellIdxInBlk;
}
int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
results[blkIdxInGroup].lengths.add(firstCellLen);
results[blkIdxInGroup].readLength += firstCellLen;
int i = (blkIdxInGroup + 1) % dataBlkNum;
for (int done = firstCellLen; done < len; done += cellSize) {
ReadPortion rp = results[i];
rp.offsetsInBuf.add(done + bufOffset);
final int readLen = Math.min(len - done, cellSize);
rp.lengths.add(readLen);
rp.readLength += readLen;
i = (i + 1) % dataBlkNum;
}
return results;
}
private static class ReaderRetryPolicy {
private int fetchEncryptionKeyTimes = 1;
@ -520,56 +467,4 @@ public class DFSStripedInputStream extends DFSInputStream {
}
throw new InterruptedException("let's retry");
}
/**
* This class represents the portion of I/O associated with each block in the
* striped block group.
*/
static class ReadPortion {
/**
* startOffsetInBlock
* |
* v
* |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
* +------------------+------------------+----------------+
* | cell_0 | cell_3 | cell_6 | <- blk_0
* +------------------+------------------+----------------+
* _/ \_______________________
* | |
* v offsetsInBuf[0] v offsetsInBuf[1]
* +------------------------------------------------------+
* | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
* | (partial) | (from blk_1 and blk_2) | |
* +------------------------------------------------------+
*/
private long startOffsetInBlock = 0;
private int readLength = 0;
private final List<Integer> offsetsInBuf = new ArrayList<>();
private final List<Integer> lengths = new ArrayList<>();
int[] getOffsets() {
int[] offsets = new int[offsetsInBuf.size()];
for (int i = 0; i < offsets.length; i++) {
offsets[i] = offsetsInBuf.get(i);
}
return offsets;
}
int[] getLengths() {
int[] lens = new int[this.lengths.size()];
for (int i = 0; i < lens.length; i++) {
lens[i] = this.lengths.get(i);
}
return lens;
}
int getReadLength() {
return readLength;
}
long getStartOffsetInBlock() {
return startOffsetInBlock;
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.util;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -27,6 +28,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Utility class for analyzing striped block groups
*/
@ -134,4 +144,168 @@ public class StripedBlockUtil {
+ offsetInBlk % cellSize; // partial cell
}
/**
* This method plans the read portion from each block in the stripe
* @param dataBlkNum The number of data blocks in the striping group
* @param cellSize The size of each striping cell
* @param startInBlk Starting offset in the striped block
* @param len Length of the read request
* @param bufOffset Initial offset in the result buffer
* @return array of {@link ReadPortion}, each representing the portion of I/O
* for an individual block in the group
*/
@VisibleForTesting
public static ReadPortion[] planReadPortions(final int dataBlkNum,
final int cellSize, final long startInBlk, final int len, int bufOffset) {
ReadPortion[] results = new ReadPortion[dataBlkNum];
for (int i = 0; i < dataBlkNum; i++) {
results[i] = new ReadPortion();
}
// cellIdxInBlk is the index of the cell in the block
// E.g., cell_3 is the 2nd cell in blk_0
int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
// blkIdxInGroup is the index of the block in the striped block group
// E.g., blk_2 is the 3rd block in the group
final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
startInBlk % cellSize;
boolean crossStripe = false;
for (int i = 1; i < dataBlkNum; i++) {
if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
cellIdxInBlk++;
crossStripe = true;
}
results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
cellSize * cellIdxInBlk;
}
int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
results[blkIdxInGroup].lengths.add(firstCellLen);
results[blkIdxInGroup].readLength += firstCellLen;
int i = (blkIdxInGroup + 1) % dataBlkNum;
for (int done = firstCellLen; done < len; done += cellSize) {
ReadPortion rp = results[i];
rp.offsetsInBuf.add(done + bufOffset);
final int readLen = Math.min(len - done, cellSize);
rp.lengths.add(readLen);
rp.readLength += readLen;
i = (i + 1) % dataBlkNum;
}
return results;
}
/**
* Get the next completed striped read task
*
* @return {@link StripedReadResult} indicating the status of the read task
* succeeded, and the block index of the task. If the method times
* out without getting any completed read tasks, -1 is returned as
* block index.
* @throws InterruptedException
*/
public static StripedReadResult getNextCompletedStripedRead(
CompletionService<Void> readService, Map<Future<Void>,
Integer> futures, final long threshold) throws InterruptedException {
Preconditions.checkArgument(!futures.isEmpty());
Preconditions.checkArgument(threshold > 0);
Future<Void> future = null;
try {
future = readService.poll(threshold, TimeUnit.MILLISECONDS);
if (future != null) {
future.get();
return new StripedReadResult(futures.remove(future),
StripedReadResult.SUCCESSFUL);
} else {
return new StripedReadResult(StripedReadResult.TIMEOUT);
}
} catch (ExecutionException e) {
return new StripedReadResult(futures.remove(future),
StripedReadResult.FAILED);
} catch (CancellationException e) {
return new StripedReadResult(futures.remove(future),
StripedReadResult.CANCELLED);
}
}
/**
* This class represents the portion of I/O associated with each block in the
* striped block group.
*/
public static class ReadPortion {
/**
* startOffsetInBlock
* |
* v
* |<-lengths[0]->|<- lengths[1] ->|<-lengths[2]->|
* +------------------+------------------+----------------+
* | cell_0 | cell_3 | cell_6 | <- blk_0
* +------------------+------------------+----------------+
* _/ \_______________________
* | |
* v offsetsInBuf[0] v offsetsInBuf[1]
* +------------------------------------------------------+
* | cell_0 | cell_1 and cell_2 |cell_3 ...| <- buf
* | (partial) | (from blk_1 and blk_2) | |
* +------------------------------------------------------+
*/
public long startOffsetInBlock = 0;
public int readLength = 0;
public final List<Integer> offsetsInBuf = new ArrayList<>();
public final List<Integer> lengths = new ArrayList<>();
public int[] getOffsets() {
int[] offsets = new int[offsetsInBuf.size()];
for (int i = 0; i < offsets.length; i++) {
offsets[i] = offsetsInBuf.get(i);
}
return offsets;
}
public int[] getLengths() {
int[] lens = new int[this.lengths.size()];
for (int i = 0; i < lens.length; i++) {
lens[i] = this.lengths.get(i);
}
return lens;
}
public boolean containsReadPortion(ReadPortion rp) {
long end = startOffsetInBlock + readLength;
return startOffsetInBlock <= rp.startOffsetInBlock && end >=
rp.startOffsetInBlock + rp.readLength;
}
}
/**
* This class represents result from a striped read request.
* If the task was successful or the internal computation failed,
* an index is also returned.
*/
public static class StripedReadResult {
public static final int SUCCESSFUL = 0x01;
public static final int FAILED = 0x02;
public static final int TIMEOUT = 0x04;
public static final int CANCELLED = 0x08;
public final int index;
public final int state;
public StripedReadResult(int state) {
Preconditions.checkArgument(state == TIMEOUT,
"Only timeout result should return negative index.");
this.index = -1;
this.state = state;
}
public StripedReadResult(int index, int state) {
Preconditions.checkArgument(state != TIMEOUT,
"Timeout result should return negative index.");
this.index = index;
this.state = state;
}
}
}

View File

@ -19,7 +19,8 @@ package org.apache.hadoop.hdfs;
import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.junit.Assert.*;
public class TestPlanReadPortions {
@ -32,13 +33,13 @@ public class TestPlanReadPortions {
private void testPlanReadPortions(int startInBlk, int length,
int bufferOffset, int[] readLengths, int[] offsetsInBlock,
int[][] bufferOffsets, int[][] bufferLengths) {
ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
ReadPortion[] results = StripedBlockUtil.planReadPortions(GROUP_SIZE,
CELLSIZE, startInBlk, length, bufferOffset);
assertEquals(GROUP_SIZE, results.length);
for (int i = 0; i < GROUP_SIZE; i++) {
assertEquals(readLengths[i], results[i].getReadLength());
assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
assertEquals(readLengths[i], results[i].readLength);
assertEquals(offsetsInBlock[i], results[i].startOffsetInBlock);
final int[] bOffsets = results[i].getOffsets();
assertArrayEquals(bufferOffsets[i], bOffsets);
final int[] bLengths = results[i].getLengths();
@ -47,7 +48,7 @@ public class TestPlanReadPortions {
}
/**
* Test {@link DFSStripedInputStream#planReadPortions}
* Test {@link StripedBlockUtil#planReadPortions}
*/
@Test
public void testPlanReadPortions() {