HDFS-8320. Erasure coding: consolidate striping-related terminologies. Contributed by Zhe Zhang and Jing Zhao.
This commit is contained in:
parent
b008348dbf
commit
7434c44b16
|
@ -223,3 +223,5 @@
|
|||
|
||||
HDFS-8418. Fix the isNeededReplication calculation for Striped block in NN.
|
||||
(Yi Liu via jing9)
|
||||
|
||||
HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz)
|
||||
|
|
|
@ -23,19 +23,18 @@ import org.apache.hadoop.fs.ReadOption;
|
|||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
|
||||
|
@ -65,30 +64,9 @@ import java.util.concurrent.CancellationException;
|
|||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/******************************************************************************
|
||||
* DFSStripedInputStream reads from striped block groups, illustrated below:
|
||||
*
|
||||
* | <- Striped Block Group -> |
|
||||
* blk_0 blk_1 blk_2 <- A striped block group has
|
||||
* | | | {@link #dataBlkNum} blocks
|
||||
* v v v
|
||||
* +------+ +------+ +------+
|
||||
* |cell_0| |cell_1| |cell_2| <- The logical read order should be
|
||||
* +------+ +------+ +------+ cell_0, cell_1, ...
|
||||
* |cell_3| |cell_4| |cell_5|
|
||||
* +------+ +------+ +------+
|
||||
* |cell_6| |cell_7| |cell_8|
|
||||
* +------+ +------+ +------+
|
||||
* |cell_9|
|
||||
* +------+ <- A cell contains {@link #cellSize} bytes of data
|
||||
*
|
||||
* Three styles of read will eventually be supported:
|
||||
* 1. Stateful read
|
||||
* 2. pread without decode support
|
||||
* This is implemented by calculating the portion of read from each block and
|
||||
* issuing requests to each DataNode in parallel.
|
||||
* 3. pread with decode support: TODO: will be supported after HDFS-7678
|
||||
*****************************************************************************/
|
||||
/**
|
||||
* DFSStripedInputStream reads from striped block groups
|
||||
*/
|
||||
public class DFSStripedInputStream extends DFSInputStream {
|
||||
|
||||
private static class ReaderRetryPolicy {
|
||||
|
@ -207,22 +185,24 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||
currentLocatedBlock = targetBlockGroup;
|
||||
|
||||
final long offsetIntoBlockGroup = getOffsetInBlockGroup();
|
||||
LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
|
||||
LocatedBlock[] targetBlocks = parseStripedBlockGroup(
|
||||
targetBlockGroup, cellSize, dataBlkNum, parityBlkNum);
|
||||
// The purpose is to get start offset into each block
|
||||
ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
|
||||
offsetIntoBlockGroup, 0, 0);
|
||||
// The purpose is to get start offset into each block.
|
||||
long[] offsetsForInternalBlocks = getStartOffsetsForInternalBlocks(schema,
|
||||
targetBlockGroup, offsetIntoBlockGroup);
|
||||
Preconditions.checkNotNull(offsetsForInternalBlocks);
|
||||
|
||||
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
|
||||
for (int i = 0; i < groupSize; i++) {
|
||||
LocatedBlock targetBlock = targetBlocks[i];
|
||||
if (targetBlock != null) {
|
||||
long offsetInBlock = offsetsForInternalBlocks[i] < 0 ?
|
||||
0 : offsetsForInternalBlocks[i];
|
||||
DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
|
||||
if (retval != null) {
|
||||
currentNodes[i] = retval.info;
|
||||
blockReaders[i] = getBlockReaderWithRetry(targetBlock,
|
||||
readPortions[i].getStartOffsetInBlock(),
|
||||
targetBlock.getBlockSize() - readPortions[i].getStartOffsetInBlock(),
|
||||
offsetInBlock, targetBlock.getBlockSize() - offsetInBlock,
|
||||
retval.addr, retval.storageType, retval.info, target, retry);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -301,12 +301,12 @@ public final class ErasureCodingWorker {
|
|||
}
|
||||
|
||||
private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
|
||||
return StripedBlockUtil.constructStripedBlock(blockGroup, cellSize,
|
||||
return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
|
||||
dataBlkNum, i);
|
||||
}
|
||||
|
||||
private long getBlockLen(ExtendedBlock blockGroup, int i) {
|
||||
return StripedBlockUtil.getStripedBlockLength(blockGroup.getNumBytes(),
|
||||
return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
|
||||
cellSize, dataBlkNum, i);
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,28 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Utility class for analyzing striped block groups
|
||||
* When accessing a file in striped layout, operations on logical byte ranges
|
||||
* in the file need to be mapped to physical byte ranges on block files stored
|
||||
* on DataNodes. This utility class facilities this mapping by defining and
|
||||
* exposing a number of striping-related concepts. The most basic ones are
|
||||
* illustrated in the following diagram. Unless otherwise specified, all
|
||||
* range-related calculations are inclusive (the end offset of the previous
|
||||
* range should be 1 byte lower than the start offset of the next one).
|
||||
*
|
||||
* | <---- Block Group ----> | <- Block Group: logical unit composing
|
||||
* | | striped HDFS files.
|
||||
* blk_0 blk_1 blk_2 <- Internal Blocks: each internal block
|
||||
* | | | represents a physically stored local
|
||||
* v v v block file
|
||||
* +------+ +------+ +------+
|
||||
* |cell_0| |cell_1| |cell_2| <- {@link StripingCell} represents the
|
||||
* +------+ +------+ +------+ logical order that a Block Group should
|
||||
* |cell_3| |cell_4| |cell_5| be accessed: cell_0, cell_1, ...
|
||||
* +------+ +------+ +------+
|
||||
* |cell_6| |cell_7| |cell_8|
|
||||
* +------+ +------+ +------+
|
||||
* |cell_9|
|
||||
* +------+ <- A cell contains cellSize bytes of data
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StripedBlockUtil {
|
||||
|
@ -103,31 +124,6 @@ public class StripedBlockUtil {
|
|||
cellSize, dataBlkNum, idxInBlockGroup));
|
||||
return block;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method creates an internal {@link ExtendedBlock} at the given index
|
||||
* of a block group, for both data and parity block.
|
||||
*/
|
||||
public static ExtendedBlock constructStripedBlock(ExtendedBlock blockGroup,
|
||||
int cellSize, int dataBlkNum, int idxInBlockGroup) {
|
||||
ExtendedBlock block = new ExtendedBlock(blockGroup);
|
||||
block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
|
||||
block.setNumBytes(getStripedBlockLength(blockGroup.getNumBytes(), cellSize,
|
||||
dataBlkNum, idxInBlockGroup));
|
||||
return block;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internal block length at the given index of a block group,
|
||||
* for both data and parity block.
|
||||
*/
|
||||
public static long getStripedBlockLength(long numBytes, int cellSize,
|
||||
int dataBlkNum, int idxInBlockGroup) {
|
||||
// parity block length is the same as the first striped block length.
|
||||
return StripedBlockUtil.getInternalBlockLength(
|
||||
numBytes, cellSize, dataBlkNum,
|
||||
idxInBlockGroup < dataBlkNum ? idxInBlockGroup : 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the size of an internal block at the given index of a block group
|
||||
|
@ -157,7 +153,7 @@ public class StripedBlockUtil {
|
|||
return (numStripes - 1L)*cellSize
|
||||
+ lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i);
|
||||
}
|
||||
|
||||
|
||||
private static int lastCellSize(int size, int cellSize, int numDataBlocks,
|
||||
int i) {
|
||||
if (i < numDataBlocks) {
|
||||
|
@ -183,60 +179,6 @@ public class StripedBlockUtil {
|
|||
+ offsetInBlk % cellSize; // partial cell
|
||||
}
|
||||
|
||||
/**
|
||||
* This method plans the read portion from each block in the stripe
|
||||
* @param dataBlkNum The number of data blocks in the striping group
|
||||
* @param cellSize The size of each striping cell
|
||||
* @param startInBlk Starting offset in the striped block
|
||||
* @param len Length of the read request
|
||||
* @param bufOffset Initial offset in the result buffer
|
||||
* @return array of {@link ReadPortion}, each representing the portion of I/O
|
||||
* for an individual block in the group
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static ReadPortion[] planReadPortions(final int dataBlkNum,
|
||||
final int cellSize, final long startInBlk, final int len, int bufOffset) {
|
||||
ReadPortion[] results = new ReadPortion[dataBlkNum];
|
||||
for (int i = 0; i < dataBlkNum; i++) {
|
||||
results[i] = new ReadPortion();
|
||||
}
|
||||
|
||||
// cellIdxInBlk is the index of the cell in the block
|
||||
// E.g., cell_3 is the 2nd cell in blk_0
|
||||
int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));
|
||||
|
||||
// blkIdxInGroup is the index of the block in the striped block group
|
||||
// E.g., blk_2 is the 3rd block in the group
|
||||
final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
|
||||
results[blkIdxInGroup].setStartOffsetInBlock(cellSize * cellIdxInBlk +
|
||||
startInBlk % cellSize);
|
||||
boolean crossStripe = false;
|
||||
for (int i = 1; i < dataBlkNum; i++) {
|
||||
if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
|
||||
cellIdxInBlk++;
|
||||
crossStripe = true;
|
||||
}
|
||||
results[(blkIdxInGroup + i) % dataBlkNum].setStartOffsetInBlock(
|
||||
cellSize * cellIdxInBlk);
|
||||
}
|
||||
|
||||
int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
|
||||
results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
|
||||
results[blkIdxInGroup].lengths.add(firstCellLen);
|
||||
results[blkIdxInGroup].addReadLength(firstCellLen);
|
||||
|
||||
int i = (blkIdxInGroup + 1) % dataBlkNum;
|
||||
for (int done = firstCellLen; done < len; done += cellSize) {
|
||||
ReadPortion rp = results[i];
|
||||
rp.offsetsInBuf.add(done + bufOffset);
|
||||
final int readLen = Math.min(len - done, cellSize);
|
||||
rp.lengths.add(readLen);
|
||||
rp.addReadLength(readLen);
|
||||
i = (i + 1) % dataBlkNum;
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next completed striped read task
|
||||
*
|
||||
|
@ -360,84 +302,167 @@ public class StripedBlockUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* This method divides a requested byte range into an array of
|
||||
* {@link AlignedStripe}
|
||||
* This method divides a requested byte range into an array of inclusive
|
||||
* {@link AlignedStripe}.
|
||||
* @param ecSchema The codec schema for the file, which carries the numbers
|
||||
* of data / parity blocks, as well as cell size
|
||||
* @param blockGroup The striped block group
|
||||
* @param rangeStartInBlockGroup The byte range's start offset in block group
|
||||
* @param rangeEndInBlockGroup The byte range's end offset in block group
|
||||
* @param buf Destination buffer of the read operation for the byte range
|
||||
* @param offsetInBuf Start offset into the destination buffer
|
||||
*
|
||||
*
|
||||
* At most 5 stripes will be generated from each logical range
|
||||
* TODO: cleanup and get rid of planReadPortions
|
||||
* At most 5 stripes will be generated from each logical range, as
|
||||
* demonstrated in the header of {@link AlignedStripe}.
|
||||
*/
|
||||
public static AlignedStripe[] divideByteRangeIntoStripes (
|
||||
ECSchema ecSchema, LocatedStripedBlock blockGroup, long start, long end,
|
||||
byte[] buf, int offsetInBuf) {
|
||||
ECSchema ecSchema, LocatedStripedBlock blockGroup,
|
||||
long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
|
||||
int offsetInBuf) {
|
||||
// TODO: change ECSchema naming to use cell size instead of chunk size
|
||||
|
||||
// Step 0: analyze range and calculate basic parameters
|
||||
int cellSize = ecSchema.getChunkSize();
|
||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||
int len = (int) (end - start + 1);
|
||||
int firstCellIdxInBG = (int) (start / cellSize);
|
||||
int lastCellIdxInBG = (int) (end / cellSize);
|
||||
int firstCellSize = Math.min(cellSize - (int) (start % cellSize), len);
|
||||
long firstCellOffsetInBlk = firstCellIdxInBG / dataBlkNum * cellSize +
|
||||
start % cellSize;
|
||||
int lastCellSize = lastCellIdxInBG == firstCellIdxInBG ?
|
||||
firstCellSize : (int) (end % cellSize) + 1;
|
||||
|
||||
// Step 1: get the unmerged ranges on each internal block
|
||||
// TODO: StripingCell should carry info on size and start offset (HDFS-8320)
|
||||
VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema,
|
||||
firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
|
||||
lastCellSize);
|
||||
// Step 1: map the byte range to StripingCells
|
||||
StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, blockGroup,
|
||||
rangeStartInBlockGroup, rangeEndInBlockGroup);
|
||||
|
||||
// Step 2: merge into at most 5 stripes
|
||||
// Step 2: get the unmerged ranges on each internal block
|
||||
VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cells);
|
||||
|
||||
// Step 3: merge into at most 5 stripes
|
||||
AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
|
||||
|
||||
// Step 3: calculate each chunk's position in destination buffer
|
||||
calcualteChunkPositionsInBuf(ecSchema, blockGroup, buf, offsetInBuf,
|
||||
firstCellIdxInBG, lastCellIdxInBG, firstCellSize, firstCellOffsetInBlk,
|
||||
lastCellSize, stripes);
|
||||
// Step 4: calculate each chunk's position in destination buffer
|
||||
calcualteChunkPositionsInBuf(ecSchema, stripes, cells, buf, offsetInBuf);
|
||||
|
||||
// Step 4: prepare ALLZERO blocks
|
||||
// Step 5: prepare ALLZERO blocks
|
||||
prepareAllZeroChunks(blockGroup, buf, stripes, cellSize, dataBlkNum);
|
||||
|
||||
return stripes;
|
||||
}
|
||||
|
||||
private static VerticalRange[] getRangesForInternalBlocks (ECSchema ecSchema,
|
||||
int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
|
||||
long firstCellOffsetInBlk, int lastCellSize) {
|
||||
/**
|
||||
* Map the logical byte range to a set of inclusive {@link StripingCell}
|
||||
* instances, each representing the overlap of the byte range to a cell
|
||||
* used by {@link DFSStripedOutputStream} in encoding
|
||||
*/
|
||||
@VisibleForTesting
|
||||
private static StripingCell[] getStripingCellsOfByteRange(ECSchema ecSchema,
|
||||
LocatedStripedBlock blockGroup,
|
||||
long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
|
||||
Preconditions.checkArgument(
|
||||
rangeStartInBlockGroup <= rangeEndInBlockGroup &&
|
||||
rangeEndInBlockGroup < blockGroup.getBlockSize());
|
||||
int cellSize = ecSchema.getChunkSize();
|
||||
int len = (int) (rangeEndInBlockGroup - rangeStartInBlockGroup + 1);
|
||||
int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
|
||||
int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
|
||||
int numCells = lastCellIdxInBG - firstCellIdxInBG + 1;
|
||||
StripingCell[] cells = new StripingCell[numCells];
|
||||
cells[0] = new StripingCell(ecSchema, firstCellIdxInBG);
|
||||
cells[numCells - 1] = new StripingCell(ecSchema, lastCellIdxInBG);
|
||||
|
||||
cells[0].offset = (int) (rangeStartInBlockGroup % cellSize);
|
||||
cells[0].size =
|
||||
Math.min(cellSize - (int) (rangeStartInBlockGroup % cellSize), len);
|
||||
if (lastCellIdxInBG != firstCellIdxInBG) {
|
||||
cells[numCells - 1].size = (int) (rangeEndInBlockGroup % cellSize) + 1;
|
||||
}
|
||||
|
||||
for (int i = 1; i < numCells - 1; i++) {
|
||||
cells[i] = new StripingCell(ecSchema, i + firstCellIdxInBG);
|
||||
}
|
||||
|
||||
return cells;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a logical start offset in a block group, calculate the physical
|
||||
* start offset into each stored internal block.
|
||||
*/
|
||||
public static long[] getStartOffsetsForInternalBlocks(
|
||||
ECSchema ecSchema, LocatedStripedBlock blockGroup,
|
||||
long rangeStartInBlockGroup) {
|
||||
Preconditions.checkArgument(
|
||||
rangeStartInBlockGroup < blockGroup.getBlockSize());
|
||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||
|
||||
int parityBlkNum = ecSchema.getNumParityUnits();
|
||||
int cellSize = ecSchema.getChunkSize();
|
||||
long[] startOffsets = new long[dataBlkNum + parityBlkNum];
|
||||
Arrays.fill(startOffsets, -1L);
|
||||
int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
|
||||
StripingCell firstCell = new StripingCell(ecSchema, firstCellIdxInBG);
|
||||
StripingCell lastCell = new StripingCell(ecSchema, lastCellIdxInBG);
|
||||
|
||||
VerticalRange ranges[] = new VerticalRange[dataBlkNum];
|
||||
ranges[firstCell.idxInStripe] =
|
||||
new VerticalRange(firstCellOffsetInBlk, firstCellSize);
|
||||
for (int i = firstCellIdxInBG + 1; i < lastCellIdxInBG; i++) {
|
||||
// iterate through all cells and update the list of StripeRanges
|
||||
StripingCell cell = new StripingCell(ecSchema, i);
|
||||
if (ranges[cell.idxInStripe] == null) {
|
||||
ranges[cell.idxInStripe] = new VerticalRange(
|
||||
cell.idxInInternalBlk * cellSize, cellSize);
|
||||
} else {
|
||||
ranges[cell.idxInStripe].spanInBlock += cellSize;
|
||||
firstCell.offset = (int) (rangeStartInBlockGroup % cellSize);
|
||||
startOffsets[firstCell.idxInStripe] =
|
||||
firstCell.idxInInternalBlk * cellSize + firstCell.offset;
|
||||
long earliestStart = startOffsets[firstCell.idxInStripe];
|
||||
for (int i = 1; i < dataBlkNum; i++) {
|
||||
int idx = firstCellIdxInBG + i;
|
||||
if (idx * cellSize >= blockGroup.getBlockSize()) {
|
||||
break;
|
||||
}
|
||||
StripingCell cell = new StripingCell(ecSchema, idx);
|
||||
startOffsets[cell.idxInStripe] = cell.idxInInternalBlk * cellSize;
|
||||
if (startOffsets[cell.idxInStripe] < earliestStart) {
|
||||
earliestStart = startOffsets[cell.idxInStripe];
|
||||
}
|
||||
}
|
||||
if (ranges[lastCell.idxInStripe] == null) {
|
||||
ranges[lastCell.idxInStripe] = new VerticalRange(
|
||||
lastCell.idxInInternalBlk * cellSize, lastCellSize);
|
||||
} else if (lastCell.idxInBlkGroup != firstCell.idxInBlkGroup) {
|
||||
ranges[lastCell.idxInStripe].spanInBlock += lastCellSize;
|
||||
for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
|
||||
startOffsets[i] = earliestStart;
|
||||
}
|
||||
return startOffsets;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a logical byte range, mapped to each {@link StripingCell}, calculate
|
||||
* the physical byte range (inclusive) on each stored internal block.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
private static VerticalRange[] getRangesForInternalBlocks(ECSchema ecSchema,
|
||||
StripingCell[] cells) {
|
||||
int cellSize = ecSchema.getChunkSize();
|
||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||
int parityBlkNum = ecSchema.getNumParityUnits();
|
||||
|
||||
VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum];
|
||||
|
||||
long earliestStart = Long.MAX_VALUE;
|
||||
long latestEnd = -1;
|
||||
for (StripingCell cell : cells) {
|
||||
// iterate through all cells and update the list of StripeRanges
|
||||
if (ranges[cell.idxInStripe] == null) {
|
||||
ranges[cell.idxInStripe] = new VerticalRange(
|
||||
cell.idxInInternalBlk * cellSize + cell.offset, cell.size);
|
||||
} else {
|
||||
ranges[cell.idxInStripe].spanInBlock += cell.size;
|
||||
}
|
||||
VerticalRange range = ranges[cell.idxInStripe];
|
||||
if (range.offsetInBlock < earliestStart) {
|
||||
earliestStart = range.offsetInBlock;
|
||||
}
|
||||
if (range.offsetInBlock + range.spanInBlock - 1 > latestEnd) {
|
||||
latestEnd = range.offsetInBlock + range.spanInBlock - 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Each parity block should be fetched at maximum range of all data blocks
|
||||
for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
|
||||
ranges[i] = new VerticalRange(earliestStart,
|
||||
latestEnd - earliestStart + 1);
|
||||
}
|
||||
|
||||
return ranges;
|
||||
}
|
||||
|
||||
private static AlignedStripe[] mergeRangesForInternalBlocks(ECSchema ecSchema,
|
||||
VerticalRange[] ranges) {
|
||||
/**
|
||||
* Merge byte ranges on each internal block into a set of inclusive
|
||||
* {@link AlignedStripe} instances.
|
||||
*/
|
||||
private static AlignedStripe[] mergeRangesForInternalBlocks(
|
||||
ECSchema ecSchema, VerticalRange[] ranges) {
|
||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||
int parityBlkNum = ecSchema.getNumParityUnits();
|
||||
List<AlignedStripe> stripes = new ArrayList<>();
|
||||
|
@ -461,12 +486,8 @@ public class StripedBlockUtil {
|
|||
}
|
||||
|
||||
private static void calcualteChunkPositionsInBuf(ECSchema ecSchema,
|
||||
LocatedStripedBlock blockGroup, byte[] buf, int offsetInBuf,
|
||||
int firstCellIdxInBG, int lastCellIdxInBG, int firstCellSize,
|
||||
long firstCellOffsetInBlk, int lastCellSize, AlignedStripe[] stripes) {
|
||||
int cellSize = ecSchema.getChunkSize();
|
||||
int dataBlkNum = ecSchema.getNumDataUnits();
|
||||
// Step 3: calculate each chunk's position in destination buffer
|
||||
AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
|
||||
int offsetInBuf) {
|
||||
/**
|
||||
* | <--------------- AlignedStripe --------------->|
|
||||
*
|
||||
|
@ -484,20 +505,11 @@ public class StripedBlockUtil {
|
|||
*
|
||||
* Cell indexing convention defined in {@link StripingCell}
|
||||
*/
|
||||
int cellSize = ecSchema.getChunkSize();
|
||||
int done = 0;
|
||||
for (int i = firstCellIdxInBG; i <= lastCellIdxInBG; i++) {
|
||||
StripingCell cell = new StripingCell(ecSchema, i);
|
||||
long cellStart = i == firstCellIdxInBG ?
|
||||
firstCellOffsetInBlk : cell.idxInInternalBlk * cellSize;
|
||||
int cellLen;
|
||||
if (i == firstCellIdxInBG) {
|
||||
cellLen = firstCellSize;
|
||||
} else if (i == lastCellIdxInBG) {
|
||||
cellLen = lastCellSize;
|
||||
} else {
|
||||
cellLen = cellSize;
|
||||
}
|
||||
long cellEnd = cellStart + cellLen - 1;
|
||||
for (StripingCell cell : cells) {
|
||||
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
|
||||
long cellEnd = cellStart + cell.size - 1;
|
||||
for (AlignedStripe s : stripes) {
|
||||
long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
|
||||
long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
|
||||
|
@ -514,10 +526,14 @@ public class StripedBlockUtil {
|
|||
add((int)(offsetInBuf + done + overlapStart - cellStart));
|
||||
s.chunks[cell.idxInStripe].lengthsInBuf.add(overLapLen);
|
||||
}
|
||||
done += cellLen;
|
||||
done += cell.size;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If a {@link StripingChunk} maps to a byte range beyond an internal block's
|
||||
* size, the chunk should be treated as zero bytes in decoding.
|
||||
*/
|
||||
private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
|
||||
byte[] buf, AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
|
||||
for (AlignedStripe s : stripes) {
|
||||
|
@ -534,51 +550,13 @@ public class StripedBlockUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* This class represents the portion of I/O associated with each block in the
|
||||
* striped block group.
|
||||
* TODO: consolidate ReadPortion with AlignedStripe
|
||||
*/
|
||||
public static class ReadPortion {
|
||||
private long startOffsetInBlock = 0;
|
||||
private int readLength = 0;
|
||||
public final List<Integer> offsetsInBuf = new ArrayList<>();
|
||||
public final List<Integer> lengths = new ArrayList<>();
|
||||
|
||||
public int[] getOffsets() {
|
||||
int[] offsets = new int[offsetsInBuf.size()];
|
||||
for (int i = 0; i < offsets.length; i++) {
|
||||
offsets[i] = offsetsInBuf.get(i);
|
||||
}
|
||||
return offsets;
|
||||
}
|
||||
|
||||
public int[] getLengths() {
|
||||
int[] lens = new int[this.lengths.size()];
|
||||
for (int i = 0; i < lens.length; i++) {
|
||||
lens[i] = this.lengths.get(i);
|
||||
}
|
||||
return lens;
|
||||
}
|
||||
|
||||
public long getStartOffsetInBlock() {
|
||||
return startOffsetInBlock;
|
||||
}
|
||||
|
||||
public int getReadLength() {
|
||||
return readLength;
|
||||
}
|
||||
|
||||
public void setStartOffsetInBlock(long startOffsetInBlock) {
|
||||
this.startOffsetInBlock = startOffsetInBlock;
|
||||
}
|
||||
|
||||
void addReadLength(int extraLength) {
|
||||
this.readLength += extraLength;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The unit of encoding used in {@link DFSStripedOutputStream}
|
||||
* Cell is the unit of encoding used in {@link DFSStripedOutputStream}. This
|
||||
* size impacts how a logical offset in the file or block group translates
|
||||
* to physical byte offset in a stored internal block. The StripingCell util
|
||||
* class facilitates this calculation. Each StripingCell is inclusive with
|
||||
* its start and end offsets -- e.g., the end logical offset of cell_0_0_0
|
||||
* should be 1 byte lower than the start logical offset of cell_1_0_1.
|
||||
*
|
||||
* | <------- Striped Block Group -------> |
|
||||
* blk_0 blk_1 blk_2
|
||||
* | | |
|
||||
|
@ -586,43 +564,57 @@ public class StripedBlockUtil {
|
|||
* +----------+ +----------+ +----------+
|
||||
* |cell_0_0_0| |cell_1_0_1| |cell_2_0_2|
|
||||
* +----------+ +----------+ +----------+
|
||||
* |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link idxInBlkGroup} = 5
|
||||
* +----------+ +----------+ +----------+ {@link idxInInternalBlk} = 1
|
||||
* {@link idxInStripe} = 2
|
||||
* |cell_3_1_0| |cell_4_1_1| |cell_5_1_2| <- {@link #idxInBlkGroup} = 5
|
||||
* +----------+ +----------+ +----------+ {@link #idxInInternalBlk} = 1
|
||||
* {@link #idxInStripe} = 2
|
||||
* A StripingCell is a special instance of {@link StripingChunk} whose offset
|
||||
* and size align with the cell used when writing data.
|
||||
* TODO: consider parity cells
|
||||
*/
|
||||
public static class StripingCell {
|
||||
@VisibleForTesting
|
||||
static class StripingCell {
|
||||
public final ECSchema schema;
|
||||
/** Logical order in a block group, used when doing I/O to a block group */
|
||||
public final int idxInBlkGroup;
|
||||
public final int idxInInternalBlk;
|
||||
public final int idxInStripe;
|
||||
final int idxInBlkGroup;
|
||||
final int idxInInternalBlk;
|
||||
final int idxInStripe;
|
||||
/**
|
||||
* When a logical byte range is mapped to a set of cells, it might
|
||||
* partially overlap with the first and last cells. This field and the
|
||||
* {@link #size} variable represent the start offset and size of the
|
||||
* overlap.
|
||||
*/
|
||||
int offset;
|
||||
int size;
|
||||
|
||||
public StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
|
||||
StripingCell(ECSchema ecSchema, int idxInBlkGroup) {
|
||||
this.schema = ecSchema;
|
||||
this.idxInBlkGroup = idxInBlkGroup;
|
||||
this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
|
||||
this.idxInStripe = idxInBlkGroup -
|
||||
this.idxInInternalBlk * ecSchema.getNumDataUnits();
|
||||
this.offset = 0;
|
||||
this.size = ecSchema.getChunkSize();
|
||||
}
|
||||
|
||||
public StripingCell(ECSchema ecSchema, int idxInInternalBlk,
|
||||
StripingCell(ECSchema ecSchema, int idxInInternalBlk,
|
||||
int idxInStripe) {
|
||||
this.schema = ecSchema;
|
||||
this.idxInInternalBlk = idxInInternalBlk;
|
||||
this.idxInStripe = idxInStripe;
|
||||
this.idxInBlkGroup =
|
||||
idxInInternalBlk * ecSchema.getNumDataUnits() + idxInStripe;
|
||||
this.offset = 0;
|
||||
this.size = ecSchema.getChunkSize();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a requested byte range on a striped block group, an AlignedStripe
|
||||
* represents a {@link VerticalRange} that is aligned with both the byte range
|
||||
* and boundaries of all internal blocks. As illustrated in the diagram, any
|
||||
* given byte range on a block group leads to 1~5 AlignedStripe's.
|
||||
* represents an inclusive {@link VerticalRange} that is aligned with both
|
||||
* the byte range and boundaries of all internal blocks. As illustrated in
|
||||
* the diagram, any given byte range on a block group leads to 1~5
|
||||
* AlignedStripe's.
|
||||
*
|
||||
* |<-------- Striped Block Group -------->|
|
||||
* blk_0 blk_1 blk_2 blk_3 blk_4
|
||||
|
@ -648,6 +640,7 @@ public class StripedBlockUtil {
|
|||
*
|
||||
* The coverage of an AlignedStripe on an internal block is represented as a
|
||||
* {@link StripingChunk}.
|
||||
*
|
||||
* To simplify the logic of reading a logical byte range from a block group,
|
||||
* a StripingChunk is either completely in the requested byte range or
|
||||
* completely outside the requested byte range.
|
||||
|
@ -692,19 +685,19 @@ public class StripedBlockUtil {
|
|||
|
||||
/**
|
||||
* A simple utility class representing an arbitrary vertical inclusive range
|
||||
* starting at {@link offsetInBlock} and lasting for {@link length} bytes in
|
||||
* an internal block. Note that VerticalRange doesn't necessarily align with
|
||||
* {@link StripingCell}.
|
||||
* starting at {@link #offsetInBlock} and lasting for {@link #spanInBlock}
|
||||
* bytes in an internal block. Note that VerticalRange doesn't necessarily
|
||||
* align with {@link StripingCell}.
|
||||
*
|
||||
* |<- Striped Block Group ->|
|
||||
* blk_0
|
||||
* |
|
||||
* v
|
||||
* +-----+
|
||||
* |~~~~~| <-- {@link offsetInBlock}
|
||||
* |~~~~~| <-- {@link #offsetInBlock}
|
||||
* | | ^
|
||||
* | | |
|
||||
* | | | {@link spanInBlock}
|
||||
* | | | {@link #spanInBlock}
|
||||
* | | v
|
||||
* |~~~~~| ---
|
||||
* | |
|
||||
|
@ -743,9 +736,9 @@ public class StripedBlockUtil {
|
|||
* +---------+ +---------+ | +----+ +----+
|
||||
* <----------- data blocks ------------> | <--- parity --->
|
||||
*
|
||||
* The class also carries {@link buf}, {@link offsetsInBuf}, and
|
||||
* {@link lengthsInBuf} to define how read task for this chunk should deliver
|
||||
* the returned data.
|
||||
* The class also carries {@link #buf}, {@link #offsetsInBuf}, and
|
||||
* {@link #lengthsInBuf} to define how read task for this chunk should
|
||||
* deliver the returned data.
|
||||
*/
|
||||
public static class StripingChunk {
|
||||
/** Chunk has been successfully fetched */
|
||||
|
|
|
@ -1,143 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestPlanReadPortions {
|
||||
|
||||
// We only support this as num of data blocks. It might be good enough for now
|
||||
// for the purpose, even not flexible yet for any number in a schema.
|
||||
private final short GROUP_SIZE = 3;
|
||||
private final int CELLSIZE = 128 * 1024;
|
||||
|
||||
private void testPlanReadPortions(int startInBlk, int length,
|
||||
int bufferOffset, int[] readLengths, int[] offsetsInBlock,
|
||||
int[][] bufferOffsets, int[][] bufferLengths) {
|
||||
ReadPortion[] results = StripedBlockUtil.planReadPortions(GROUP_SIZE,
|
||||
CELLSIZE, startInBlk, length, bufferOffset);
|
||||
assertEquals(GROUP_SIZE, results.length);
|
||||
|
||||
for (int i = 0; i < GROUP_SIZE; i++) {
|
||||
assertEquals(readLengths[i], results[i].getReadLength());
|
||||
assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
|
||||
final int[] bOffsets = results[i].getOffsets();
|
||||
assertArrayEquals(bufferOffsets[i], bOffsets);
|
||||
final int[] bLengths = results[i].getLengths();
|
||||
assertArrayEquals(bufferLengths[i], bLengths);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link StripedBlockUtil#planReadPortions}
|
||||
*/
|
||||
@Test
|
||||
public void testPlanReadPortions() {
|
||||
/**
|
||||
* start block offset is 0, read cellSize - 10
|
||||
*/
|
||||
testPlanReadPortions(0, CELLSIZE - 10, 0,
|
||||
new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
|
||||
new int[][]{new int[]{0}, new int[]{}, new int[]{}},
|
||||
new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
|
||||
|
||||
/**
|
||||
* start block offset is 0, read 3 * cellSize
|
||||
*/
|
||||
testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
|
||||
new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
|
||||
new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
|
||||
new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
|
||||
|
||||
/**
|
||||
* start block offset is 0, read cellSize + 10
|
||||
*/
|
||||
testPlanReadPortions(0, CELLSIZE + 10, 0,
|
||||
new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
|
||||
new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
|
||||
new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
|
||||
|
||||
/**
|
||||
* start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
|
||||
*/
|
||||
testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
|
||||
new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
|
||||
new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
|
||||
new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
|
||||
new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
|
||||
new int[][]{new int[]{CELLSIZE, CELLSIZE},
|
||||
new int[]{CELLSIZE, CELLSIZE},
|
||||
new int[]{CELLSIZE, 10}});
|
||||
|
||||
/**
|
||||
* start block offset is 2, read 3 * cellSize
|
||||
*/
|
||||
testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
|
||||
new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
|
||||
new int[]{2, 0, 0},
|
||||
new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
|
||||
new int[]{100 + CELLSIZE - 2},
|
||||
new int[]{100 + CELLSIZE * 2 - 2}},
|
||||
new int[][]{new int[]{CELLSIZE - 2, 2},
|
||||
new int[]{CELLSIZE},
|
||||
new int[]{CELLSIZE}});
|
||||
|
||||
/**
|
||||
* start block offset is 2, read 3 * cellSize + 10
|
||||
*/
|
||||
testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
|
||||
new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
|
||||
new int[]{2, 0, 0},
|
||||
new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
|
||||
new int[]{CELLSIZE - 2},
|
||||
new int[]{CELLSIZE * 2 - 2}},
|
||||
new int[][]{new int[]{CELLSIZE - 2, 12},
|
||||
new int[]{CELLSIZE},
|
||||
new int[]{CELLSIZE}});
|
||||
|
||||
/**
|
||||
* start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
|
||||
*/
|
||||
testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
|
||||
new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
|
||||
new int[]{CELLSIZE, CELLSIZE - 1, 0},
|
||||
new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
|
||||
new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
|
||||
new int[]{1, 3 * CELLSIZE + 1}},
|
||||
new int[][]{new int[]{CELLSIZE, CELLSIZE},
|
||||
new int[]{1, CELLSIZE, 9},
|
||||
new int[]{CELLSIZE, CELLSIZE}});
|
||||
|
||||
/**
|
||||
* start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
|
||||
*/
|
||||
testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
|
||||
new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
|
||||
new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
|
||||
new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
|
||||
new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
|
||||
new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
|
||||
new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
|
||||
new int[]{CELLSIZE, CELLSIZE, 9},
|
||||
new int[]{1, CELLSIZE, CELLSIZE}});
|
||||
}
|
||||
}
|
|
@ -189,13 +189,13 @@ public class TestRecoverStripedFile {
|
|||
deadDnIndices[i] = dnMap.get(dataDNs[i]);
|
||||
|
||||
// Check the block replica file on deadDn before it dead.
|
||||
blocks[i] = StripedBlockUtil.constructStripedBlock(
|
||||
blocks[i] = StripedBlockUtil.constructInternalBlock(
|
||||
lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
|
||||
replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
|
||||
metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
|
||||
// the block replica on the datanode should be the same as expected
|
||||
assertEquals(replicas[i].length(),
|
||||
StripedBlockUtil.getStripedBlockLength(
|
||||
StripedBlockUtil.getInternalBlockLength(
|
||||
lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
|
||||
assertTrue(metadatas[i].getName().
|
||||
endsWith(blocks[i].getGenerationStamp() + ".meta"));
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
|
@ -26,26 +27,107 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.parseStripedBlockGroup;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
|
||||
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.*;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
/**
|
||||
* Need to cover the following combinations:
|
||||
* 1. Block group size:
|
||||
* 1.1 One byte
|
||||
* 1.2 Smaller than cell
|
||||
* 1.3 One full cell
|
||||
* 1.4 x full cells, where x is smaller than number of data blocks
|
||||
* 1.5 x full cells plus a partial cell
|
||||
* 1.6 One full stripe
|
||||
* 1.7 One full stripe plus a partial cell
|
||||
* 1.8 One full stripe plus x full cells
|
||||
* 1.9 One full stripe plus x full cells plus a partial cell
|
||||
* 1.10 y full stripes, but smaller than full block group size
|
||||
* 1.11 Full block group size
|
||||
*
|
||||
* 2. Byte range start
|
||||
* 2.1 Zero
|
||||
* 2.2 Within first cell
|
||||
* 2.3 End of first cell
|
||||
* 2.4 Start of a middle* cell in the first stripe (* neither first or last)
|
||||
* 2.5 End of middle cell in the first stripe
|
||||
* 2.6 Within a middle cell in the first stripe
|
||||
* 2.7 Start of the last cell in the first stripe
|
||||
* 2.8 Within the last cell in the first stripe
|
||||
* 2.9 End of the last cell in the first stripe
|
||||
* 2.10 Start of a middle stripe
|
||||
* 2.11 Within a middle stripe
|
||||
* 2.12 End of a middle stripe
|
||||
* 2.13 Start of the last stripe
|
||||
* 2.14 Within the last stripe
|
||||
* 2.15 End of the last stripe (last byte)
|
||||
*
|
||||
* 3. Byte range length: same settings as block group size
|
||||
*
|
||||
* We should test in total 11 x 15 x 11 = 1815 combinations
|
||||
* TODO: test parity block logic
|
||||
*/
|
||||
public class TestStripedBlockUtil {
|
||||
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
|
||||
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
|
||||
private final short BLK_GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM;
|
||||
private final short BLK_GROUP_WIDTH = DATA_BLK_NUM + PARITY_BLK_NUM;
|
||||
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
|
||||
private final int FULL_STRIPE_SIZE = DATA_BLK_NUM * CELLSIZE;
|
||||
/** number of full stripes in a full block group */
|
||||
private final int BLK_GROUP_STRIPE_NUM = 16;
|
||||
private final ECSchema SCEHMA = ErasureCodingSchemaManager.
|
||||
getSystemDefaultSchema();
|
||||
private final Random random = new Random();
|
||||
|
||||
private LocatedStripedBlock createDummyLocatedBlock() {
|
||||
private int[] blockGroupSizes;
|
||||
private int[] byteRangeStartOffsets;
|
||||
private int[] byteRangeSizes;
|
||||
|
||||
@Before
|
||||
public void setup(){
|
||||
blockGroupSizes = new int[]{1, getDelta(CELLSIZE), CELLSIZE,
|
||||
getDelta(DATA_BLK_NUM) * CELLSIZE,
|
||||
getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
|
||||
FULL_STRIPE_SIZE, FULL_STRIPE_SIZE + getDelta(CELLSIZE),
|
||||
FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE,
|
||||
FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
|
||||
getDelta(BLK_GROUP_STRIPE_NUM) * FULL_STRIPE_SIZE,
|
||||
BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE};
|
||||
byteRangeStartOffsets = new int[] {0, getDelta(CELLSIZE), CELLSIZE - 1};
|
||||
byteRangeSizes = new int[]{1, getDelta(CELLSIZE), CELLSIZE,
|
||||
getDelta(DATA_BLK_NUM) * CELLSIZE,
|
||||
getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
|
||||
FULL_STRIPE_SIZE, FULL_STRIPE_SIZE + getDelta(CELLSIZE),
|
||||
FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE,
|
||||
FULL_STRIPE_SIZE + getDelta(DATA_BLK_NUM) * CELLSIZE + getDelta(CELLSIZE),
|
||||
getDelta(BLK_GROUP_STRIPE_NUM) * FULL_STRIPE_SIZE,
|
||||
BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE};
|
||||
}
|
||||
|
||||
private int getDelta(int size) {
|
||||
return 1 + random.nextInt(size - 2);
|
||||
}
|
||||
private byte hashIntToByte(int i) {
|
||||
int BYTE_MASK = 0xff;
|
||||
return (byte) (((i + 13) * 29) & BYTE_MASK);
|
||||
}
|
||||
|
||||
private LocatedStripedBlock createDummyLocatedBlock(int bgSize) {
|
||||
final long blockGroupID = -1048576;
|
||||
DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_SIZE];
|
||||
String[] storageIDs = new String[BLK_GROUP_SIZE];
|
||||
StorageType[] storageTypes = new StorageType[BLK_GROUP_SIZE];
|
||||
int[] indices = new int[BLK_GROUP_SIZE];
|
||||
for (int i = 0; i < BLK_GROUP_SIZE; i++) {
|
||||
DatanodeInfo[] locs = new DatanodeInfo[BLK_GROUP_WIDTH];
|
||||
String[] storageIDs = new String[BLK_GROUP_WIDTH];
|
||||
StorageType[] storageTypes = new StorageType[BLK_GROUP_WIDTH];
|
||||
int[] indices = new int[BLK_GROUP_WIDTH];
|
||||
for (int i = 0; i < BLK_GROUP_WIDTH; i++) {
|
||||
indices[i] = (i + 2) % DATA_BLK_NUM;
|
||||
// Location port always equal to logical index of a block,
|
||||
// for easier verification
|
||||
|
@ -53,13 +135,40 @@ public class TestStripedBlockUtil {
|
|||
storageIDs[i] = locs[i].getDatanodeUuid();
|
||||
storageTypes[i] = StorageType.DISK;
|
||||
}
|
||||
return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
|
||||
locs, storageIDs, storageTypes, indices, 0, false, null);
|
||||
return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID,
|
||||
bgSize, 1001), locs, storageIDs, storageTypes, indices, 0, false,
|
||||
null);
|
||||
}
|
||||
|
||||
private byte[][] createInternalBlkBuffers(int bgSize) {
|
||||
byte[][] bufs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][];
|
||||
int[] pos = new int[DATA_BLK_NUM + PARITY_BLK_NUM];
|
||||
for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
|
||||
int bufSize = (int) getInternalBlockLength(
|
||||
bgSize, CELLSIZE, DATA_BLK_NUM, i);
|
||||
bufs[i] = new byte[bufSize];
|
||||
pos[i] = 0;
|
||||
}
|
||||
int done = 0;
|
||||
while (done < bgSize) {
|
||||
Preconditions.checkState(done % CELLSIZE == 0);
|
||||
StripingCell cell = new StripingCell(SCEHMA, done / CELLSIZE);
|
||||
int idxInStripe = cell.idxInStripe;
|
||||
int size = Math.min(CELLSIZE, bgSize - done);
|
||||
for (int i = 0; i < size; i++) {
|
||||
bufs[idxInStripe][pos[idxInStripe] + i] = hashIntToByte(done + i);
|
||||
}
|
||||
done += size;
|
||||
pos[idxInStripe] += size;
|
||||
}
|
||||
|
||||
return bufs;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseDummyStripedBlock() {
|
||||
LocatedStripedBlock lsb = createDummyLocatedBlock();
|
||||
LocatedStripedBlock lsb = createDummyLocatedBlock(
|
||||
BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE);
|
||||
LocatedBlock[] blocks = parseStripedBlockGroup(
|
||||
lsb, CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
|
||||
assertEquals(DATA_BLK_NUM + PARITY_BLK_NUM, blocks.length);
|
||||
|
@ -68,14 +177,15 @@ public class TestStripedBlockUtil {
|
|||
assertEquals(i,
|
||||
BlockIdManager.getBlockIndex(blocks[i].getBlock().getLocalBlock()));
|
||||
assertEquals(i * CELLSIZE, blocks[i].getStartOffset());
|
||||
/** TODO: properly define {@link LocatedBlock#offset} for internal blocks */
|
||||
assertEquals(1, blocks[i].getLocations().length);
|
||||
assertEquals(i, blocks[i].getLocations()[0].getIpcPort());
|
||||
assertEquals(i, blocks[i].getLocations()[0].getXferPort());
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyInternalBlocks (long numBytesInGroup, long[] expected) {
|
||||
for (int i = 1; i < BLK_GROUP_SIZE; i++) {
|
||||
private void verifyInternalBlocks (int numBytesInGroup, int[] expected) {
|
||||
for (int i = 1; i < BLK_GROUP_WIDTH; i++) {
|
||||
assertEquals(expected[i],
|
||||
getInternalBlockLength(numBytesInGroup, CELLSIZE, DATA_BLK_NUM, i));
|
||||
}
|
||||
|
@ -85,41 +195,85 @@ public class TestStripedBlockUtil {
|
|||
public void testGetInternalBlockLength () {
|
||||
// A small delta that is smaller than a cell
|
||||
final int delta = 10;
|
||||
assert delta < CELLSIZE;
|
||||
|
||||
// Block group is smaller than a cell
|
||||
verifyInternalBlocks(CELLSIZE - delta,
|
||||
new long[] {CELLSIZE - delta, 0, 0, 0, 0, 0,
|
||||
new int[] {CELLSIZE - delta, 0, 0, 0, 0, 0,
|
||||
CELLSIZE - delta, CELLSIZE - delta, CELLSIZE - delta});
|
||||
|
||||
// Block group is exactly as large as a cell
|
||||
verifyInternalBlocks(CELLSIZE,
|
||||
new long[] {CELLSIZE, 0, 0, 0, 0, 0,
|
||||
new int[] {CELLSIZE, 0, 0, 0, 0, 0,
|
||||
CELLSIZE, CELLSIZE, CELLSIZE});
|
||||
|
||||
// Block group is a little larger than a cell
|
||||
verifyInternalBlocks(CELLSIZE + delta,
|
||||
new long[] {CELLSIZE, delta, 0, 0, 0, 0,
|
||||
new int[] {CELLSIZE, delta, 0, 0, 0, 0,
|
||||
CELLSIZE, CELLSIZE, CELLSIZE});
|
||||
|
||||
// Block group contains multiple stripes and ends at stripe boundary
|
||||
verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE,
|
||||
new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
new int[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
|
||||
|
||||
// Block group contains multiple stripes and ends at cell boundary
|
||||
// (not ending at stripe boundary)
|
||||
verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE + CELLSIZE,
|
||||
new long[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
new int[] {3 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
3 * CELLSIZE, 3 * CELLSIZE, 3 * CELLSIZE});
|
||||
|
||||
// Block group contains multiple stripes and doesn't end at cell boundary
|
||||
verifyInternalBlocks(2 * DATA_BLK_NUM * CELLSIZE - delta,
|
||||
new long[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
new int[] {2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE,
|
||||
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE - delta,
|
||||
2 * CELLSIZE, 2 * CELLSIZE, 2 * CELLSIZE});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test dividing a byte range into aligned stripes and verify the aligned
|
||||
* ranges can be translated back to the byte range.
|
||||
*/
|
||||
@Test
|
||||
public void testDivideByteRangeIntoStripes() {
|
||||
byte[] assembled = new byte[BLK_GROUP_STRIPE_NUM * FULL_STRIPE_SIZE];
|
||||
for (int bgSize : blockGroupSizes) {
|
||||
LocatedStripedBlock blockGroup = createDummyLocatedBlock(bgSize);
|
||||
byte[][] internalBlkBufs = createInternalBlkBuffers(bgSize);
|
||||
for (int brStart : byteRangeStartOffsets) {
|
||||
for (int brSize : byteRangeSizes) {
|
||||
if (brStart + brSize > bgSize) {
|
||||
continue;
|
||||
}
|
||||
AlignedStripe[] stripes = divideByteRangeIntoStripes(SCEHMA,
|
||||
blockGroup, brStart, brStart + brSize - 1, assembled, 0);
|
||||
|
||||
for (AlignedStripe stripe : stripes) {
|
||||
for (int i = 0; i < DATA_BLK_NUM; i++) {
|
||||
StripingChunk chunk = stripe.chunks[i];
|
||||
if (chunk == null || chunk.state != StripingChunk.REQUESTED) {
|
||||
continue;
|
||||
}
|
||||
int done = 0;
|
||||
for (int j = 0; j < chunk.getLengths().length; j++) {
|
||||
System.arraycopy(internalBlkBufs[i],
|
||||
(int) stripe.getOffsetInBlock() + done, assembled,
|
||||
chunk.getOffsets()[j], chunk.getLengths()[j]);
|
||||
done += chunk.getLengths()[j];
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < brSize; i++) {
|
||||
if (hashIntToByte(brStart + i) != assembled[i]) {
|
||||
System.out.println("Oops");
|
||||
}
|
||||
assertEquals("Byte at " + (brStart + i) + " should be the same",
|
||||
hashIntToByte(brStart + i), assembled[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue