HADOOP-12040. Adjust inputs order for the decode API in raw erasure coder. (Kai Zheng via yliu)

This commit is contained in:
yliu 2015-10-28 16:18:23 +08:00
parent 6ff6663f64
commit c201cf951d
12 changed files with 154 additions and 116 deletions

View File

@ -922,6 +922,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11685. StorageException complaining " no lease ID" during HBase
distributed log splitting (Duo Xu via cnauroth)
HADOOP-12040. Adjust inputs order for the decode API in raw erasure coder.
(Kai Zheng via yliu)
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -59,13 +59,14 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder {
* @return
*/
protected ECBlock[] getInputBlocks(ECBlockGroup blockGroup) {
ECBlock[] inputBlocks = new ECBlock[getNumParityUnits()
+ getNumDataUnits()];
ECBlock[] inputBlocks = new ECBlock[getNumDataUnits() +
getNumParityUnits()];
System.arraycopy(blockGroup.getParityBlocks(), 0, inputBlocks, 0,
getNumParityUnits());
System.arraycopy(blockGroup.getDataBlocks(), 0, inputBlocks,
getNumParityUnits(), getNumDataUnits());
0, getNumDataUnits());
System.arraycopy(blockGroup.getParityBlocks(), 0, inputBlocks,
getNumDataUnits(), getNumParityUnits());
return inputBlocks;
}
@ -80,18 +81,18 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder {
int idx = 0;
for (int i = 0; i < getNumParityUnits(); i++) {
if (blockGroup.getParityBlocks()[i].isErased()) {
outputBlocks[idx++] = blockGroup.getParityBlocks()[i];
}
}
for (int i = 0; i < getNumDataUnits(); i++) {
if (blockGroup.getDataBlocks()[i].isErased()) {
outputBlocks[idx++] = blockGroup.getDataBlocks()[i];
}
}
for (int i = 0; i < getNumParityUnits(); i++) {
if (blockGroup.getParityBlocks()[i].isErased()) {
outputBlocks[idx++] = blockGroup.getParityBlocks()[i];
}
}
return outputBlocks;
}

View File

@ -34,10 +34,12 @@ public abstract class AbstractRawErasureCoder
private final int numDataUnits;
private final int numParityUnits;
private final int numAllUnits;
public AbstractRawErasureCoder(int numDataUnits, int numParityUnits) {
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
this.numAllUnits = numDataUnits + numParityUnits;
}
@Override
@ -50,6 +52,10 @@ public abstract class AbstractRawErasureCoder
return numParityUnits;
}
protected int getNumAllUnits() {
return numAllUnits;
}
@Override
public boolean preferDirectBuffer() {
return false;

View File

@ -72,6 +72,35 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
numParityUnits);
}
@Override
public void decode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
// Make copies avoiding affecting original ones;
ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
int[] newErasedIndexes = new int[erasedIndexes.length];
ByteBuffer[] newOutputs = new ByteBuffer[outputs.length];
// Adjust the order to match with underlying requirements.
adjustOrder(inputs, newInputs,
erasedIndexes, newErasedIndexes, outputs, newOutputs);
super.decode(newInputs, newErasedIndexes, newOutputs);
}
@Override
public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) {
// Make copies avoiding affecting original ones;
byte[][] newInputs = new byte[inputs.length][];
int[] newErasedIndexes = new int[erasedIndexes.length];
byte[][] newOutputs = new byte[outputs.length][];
// Adjust the order to match with underlying requirements.
adjustOrder(inputs, newInputs,
erasedIndexes, newErasedIndexes, outputs, newOutputs);
super.decode(newInputs, newErasedIndexes, newOutputs);
}
private void doDecodeImpl(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer valid = findFirstValidInput(inputs);
@ -95,7 +124,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
}
RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets,
erasedIndexes.length, dataLen);
erasedIndexes.length, dataLen);
}
@Override
@ -146,7 +175,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
}
doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes,
adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
}
@Override
@ -200,6 +229,42 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
adjustedDirectBufferOutputsParameter);
}
/*
* Convert data units first order to parity units first order.
*/
private <T> void adjustOrder(T[] inputs, T[] inputs2,
int[] erasedIndexes, int[] erasedIndexes2,
T[] outputs, T[] outputs2) {
// Example:
// d0 d1 d2 d3 d4 d5 : p0 p1 p2 => p0 p1 p2 : d0 d1 d2 d3 d4 d5
System.arraycopy(inputs, getNumDataUnits(), inputs2,
0, getNumParityUnits());
System.arraycopy(inputs, 0, inputs2,
getNumParityUnits(), getNumDataUnits());
int numErasedDataUnits = 0, numErasedParityUnits = 0;
int idx = 0;
for (int i = 0; i < erasedIndexes.length; i++) {
if (erasedIndexes[i] >= getNumDataUnits()) {
erasedIndexes2[idx++] = erasedIndexes[i] - getNumDataUnits();
numErasedParityUnits++;
}
}
for (int i = 0; i < erasedIndexes.length; i++) {
if (erasedIndexes[i] < getNumDataUnits()) {
erasedIndexes2[idx++] = erasedIndexes[i] + getNumParityUnits();
numErasedDataUnits++;
}
}
// Copy for data units
System.arraycopy(outputs, numErasedDataUnits, outputs2,
0, numErasedParityUnits);
// Copy for parity units
System.arraycopy(outputs, 0, outputs2,
numErasedParityUnits, numErasedDataUnits);
}
private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) {
if (bytesArrayBuffers[idx] == null ||
bytesArrayBuffers[idx].length < bufferLen) {

View File

@ -35,8 +35,8 @@ public interface RawErasureDecoder extends RawErasureCoder {
/**
* Decode with inputs and erasedIndexes, generates outputs.
* How to prepare for inputs:
* 1. Create an array containing parity units + data units. Please note the
* parity units should be first or before the data units.
* 1. Create an array containing data units + parity units. Please note the
* data units should be first or before the parity units.
* 2. Set null in the array locations specified via erasedIndexes to indicate
* they're erased and no data are to read from;
* 3. Set null in the array locations for extra redundant items, as they're
@ -47,8 +47,8 @@ public interface RawErasureDecoder extends RawErasureCoder {
* For an example using RS (6, 3), assuming sources (d0, d1, d2, d3, d4, d5)
* and parities (p0, p1, p2), d2 being erased. We can and may want to use only
* 6 units like (d1, d3, d4, d5, p0, p2) to recover d2. We will have:
* inputs = [p0, null(p1), p2, null(d0), d1, null(d2), d3, d4, d5]
* erasedIndexes = [5] // index of d2 into inputs array
* inputs = [null(d0), d1, null(d2), d3, d4, d5, p0, null(p1), p2]
* erasedIndexes = [2] // index of d2 into inputs array
* outputs = [a-writable-buffer]
*
* Note, for both inputs and outputs, no mixing of on-heap buffers and direct

View File

@ -40,7 +40,7 @@ public abstract class TestCoderBase {
private Configuration conf;
protected int numDataUnits;
protected int numParityUnits;
protected int baseChunkSize = 513;
protected int baseChunkSize = 1024;
private int chunkSize = baseChunkSize;
private BufferAllocator allocator;
@ -165,7 +165,9 @@ public abstract class TestCoderBase {
byte[][] erased = toArrays(erasedChunks);
byte[][] recovered = toArrays(recoveredChunks);
boolean result = Arrays.deepEquals(erased, recovered);
assertTrue("Decoding and comparing failed.", result);
if (!result) {
assertTrue("Decoding and comparing failed.", result);
}
}
/**
@ -175,39 +177,41 @@ public abstract class TestCoderBase {
*/
protected int[] getErasedIndexesForDecoding() {
int[] erasedIndexesForDecoding =
new int[erasedParityIndexes.length + erasedDataIndexes.length];
new int[erasedDataIndexes.length + erasedParityIndexes.length];
int idx = 0;
for (int i = 0; i < erasedParityIndexes.length; i++) {
erasedIndexesForDecoding[idx ++] = erasedParityIndexes[i];
for (int i = 0; i < erasedDataIndexes.length; i++) {
erasedIndexesForDecoding[idx ++] = erasedDataIndexes[i];
}
for (int i = 0; i < erasedDataIndexes.length; i++) {
erasedIndexesForDecoding[idx ++] = erasedDataIndexes[i] + numParityUnits;
for (int i = 0; i < erasedParityIndexes.length; i++) {
erasedIndexesForDecoding[idx ++] = erasedParityIndexes[i] + numDataUnits;
}
return erasedIndexesForDecoding;
}
/**
* Return input chunks for decoding, which is parityChunks + dataChunks.
* Return input chunks for decoding, which is dataChunks + parityChunks.
* @param dataChunks
* @param parityChunks
* @return
*/
protected ECChunk[] prepareInputChunksForDecoding(ECChunk[] dataChunks,
ECChunk[] parityChunks) {
ECChunk[] inputChunks = new ECChunk[numParityUnits + numDataUnits];
ECChunk[] inputChunks = new ECChunk[numDataUnits + numParityUnits];
int idx = 0;
for (int i = 0; i < numParityUnits; i++) {
inputChunks[idx ++] = parityChunks[i];
}
for (int i = 0; i < numDataUnits; i++) {
inputChunks[idx ++] = dataChunks[i];
}
for (int i = 0; i < numParityUnits; i++) {
inputChunks[idx ++] = parityChunks[i];
}
return inputChunks;
}
@ -221,21 +225,21 @@ public abstract class TestCoderBase {
*/
protected ECChunk[] backupAndEraseChunks(ECChunk[] dataChunks,
ECChunk[] parityChunks) {
ECChunk[] toEraseChunks = new ECChunk[erasedParityIndexes.length +
erasedDataIndexes.length];
ECChunk[] toEraseChunks = new ECChunk[erasedDataIndexes.length +
erasedParityIndexes.length];
int idx = 0;
for (int i = 0; i < erasedParityIndexes.length; i++) {
toEraseChunks[idx ++] = parityChunks[erasedParityIndexes[i]];
parityChunks[erasedParityIndexes[i]] = null;
}
for (int i = 0; i < erasedDataIndexes.length; i++) {
toEraseChunks[idx ++] = dataChunks[erasedDataIndexes[i]];
dataChunks[erasedDataIndexes[i]] = null;
}
for (int i = 0; i < erasedParityIndexes.length; i++) {
toEraseChunks[idx ++] = parityChunks[erasedParityIndexes[i]];
parityChunks[erasedParityIndexes[i]] = null;
}
return toEraseChunks;
}

View File

@ -235,14 +235,14 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
int idx = 0;
TestBlock block;
for (int i = 0; i < erasedParityIndexes.length; i++) {
block = parityBlocks[erasedParityIndexes[i]];
for (int i = 0; i < erasedDataIndexes.length; i++) {
block = dataBlocks[erasedDataIndexes[i]];
toEraseBlocks[idx ++] = cloneBlockWithData(block);
eraseDataFromBlock(block);
}
for (int i = 0; i < erasedDataIndexes.length; i++) {
block = dataBlocks[erasedDataIndexes[i]];
for (int i = 0; i < erasedParityIndexes.length; i++) {
block = parityBlocks[erasedParityIndexes[i]];
toEraseBlocks[idx ++] = cloneBlockWithData(block);
eraseDataFromBlock(block);
}

View File

@ -33,9 +33,6 @@ public class TestRSErasureCoder extends TestErasureCoderBase {
this.encoderClass = RSErasureEncoder.class;
this.decoderClass = RSErasureDecoder.class;
this.numDataUnits = 10;
this.numParityUnits = 1;
this.numChunksInBlock = 10;
}
@ -119,8 +116,8 @@ public class TestRSErasureCoder extends TestErasureCoderBase {
}
@Test
public void testCodingDirectBuffer_3x3_erasing_d0_p0() {
prepare(null, 3, 3, new int[] {0}, new int[] {0});
public void testCodingDirectBuffer_6x3_erasing_d0_p0() {
prepare(null, 6, 3, new int[] {0}, new int[] {0});
testCoding(true);
}
}

View File

@ -825,9 +825,7 @@ public class DFSStripedInputStream extends DFSInputStream {
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum &&
alignedStripe.chunks[index] == null);
final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
dataBlkNum, parityBlkNum);
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
alignedStripe.chunks[index].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock());
return true;
@ -835,8 +833,7 @@ public class DFSStripedInputStream extends DFSInputStream {
@Override
void decode() {
StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum,
parityBlkNum, alignedStripe);
StripedBlockUtil.finalizeDecodeInputs(decodeInputs, alignedStripe);
StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
dataBlkNum, parityBlkNum, decoder);
}
@ -867,12 +864,9 @@ public class DFSStripedInputStream extends DFSInputStream {
int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
cur.position(pos);
cur.limit((int) (pos + range.spanInBlock));
final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
dataBlkNum, parityBlkNum);
decodeInputs[decodeIndex] = cur.slice();
decodeInputs[i] = cur.slice();
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] = new StripingChunk(
decodeInputs[decodeIndex]);
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
}
}
}
@ -887,13 +881,12 @@ public class DFSStripedInputStream extends DFSInputStream {
// we have failed the block reader before
return false;
}
final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
dataBlkNum, parityBlkNum);
final int parityIndex = index - dataBlkNum;
ByteBuffer buf = getParityBuffer().duplicate();
buf.position(cellSize * decodeIndex);
buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock);
decodeInputs[decodeIndex] = buf.slice();
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
buf.position(cellSize * parityIndex);
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
decodeInputs[index] = buf.slice();
alignedStripe.chunks[index] = new StripingChunk(decodeInputs[index]);
return true;
}
@ -902,18 +895,16 @@ public class DFSStripedInputStream extends DFSInputStream {
// TODO no copy for data chunks. this depends on HADOOP-12047
final int span = (int) alignedStripe.getSpanInBlock();
for (int i = 0; i < alignedStripe.chunks.length; i++) {
final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
dataBlkNum, parityBlkNum);
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
for (int j = 0; j < span; j++) {
decodeInputs[decodeIndex].put((byte) 0);
decodeInputs[i].put((byte) 0);
}
decodeInputs[decodeIndex].flip();
decodeInputs[i].flip();
} else if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
decodeInputs[decodeIndex].position(0);
decodeInputs[decodeIndex].limit(span);
decodeInputs[i].position(0);
decodeInputs[i].limit(span);
}
}
int[] decodeIndices = new int[parityBlkNum];
@ -921,12 +912,10 @@ public class DFSStripedInputStream extends DFSInputStream {
for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.MISSING) {
int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
dataBlkNum, parityBlkNum);
if (i < dataBlkNum) {
decodeIndices[pos++] = decodeIndex;
decodeIndices[pos++] = i;
} else {
decodeInputs[decodeIndex] = null;
decodeInputs[i] = null;
}
}
}

View File

@ -270,8 +270,7 @@ public class StripedBlockUtil {
// read the full data aligned stripe
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] == null) {
final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
alignedStripe.chunks[i].addByteArraySlice(0,
(int) alignedStripe.getSpanInBlock());
}
@ -287,40 +286,19 @@ public class StripedBlockUtil {
* finalize decode input buffers.
*/
public static void finalizeDecodeInputs(final byte[][] decodeInputs,
int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) {
AlignedStripe alignedStripe) {
for (int i = 0; i < alignedStripe.chunks.length; i++) {
final StripingChunk chunk = alignedStripe.chunks[i];
final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
if (chunk != null && chunk.state == StripingChunk.FETCHED) {
chunk.copyTo(decodeInputs[decodeIndex]);
chunk.copyTo(decodeInputs[i]);
} else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
Arrays.fill(decodeInputs[decodeIndex], (byte) 0);
Arrays.fill(decodeInputs[i], (byte) 0);
} else {
decodeInputs[decodeIndex] = null;
decodeInputs[i] = null;
}
}
}
/**
* Currently decoding requires parity chunks are before data chunks.
* The indices are opposite to what we store in NN. In future we may
* improve the decoding to make the indices order the same as in NN.
*
* @param index The index to convert
* @param dataBlkNum The number of data blocks
* @param parityBlkNum The number of parity blocks
* @return converted index
*/
public static int convertIndex4Decode(int index, int dataBlkNum,
int parityBlkNum) {
return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum;
}
public static int convertDecodeIndexBack(int index, int dataBlkNum,
int parityBlkNum) {
return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum;
}
/**
* Decode based on the given input buffers and erasure coding policy.
*/
@ -333,7 +311,7 @@ public class StripedBlockUtil {
for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] != null &&
alignedStripe.chunks[i].state == StripingChunk.MISSING){
decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
decodeIndices[pos++] = i;
}
}
decodeIndices = Arrays.copyOf(decodeIndices, pos);
@ -345,8 +323,7 @@ public class StripedBlockUtil {
// Step 3: fill original application buffer with decoded data
for (int i = 0; i < decodeIndices.length; i++) {
int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i],
dataBlkNum, parityBlkNum);
int missingBlkIdx = decodeIndices[i];
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
if (chunk.state == StripingChunk.MISSING) {
chunk.copyFrom(decodeOutputs[i]);

View File

@ -79,8 +79,6 @@ import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
/**
* ErasureCodingWorker handles the erasure coding recovery work commands. These
* commands would be issued from Namenode as part of Datanode's heart beat
@ -621,8 +619,7 @@ public final class ErasureCodingWorker {
int m = 0;
for (int i = 0; i < targets.length; i++) {
if (targetsStatus[i]) {
result[m++] = convertIndex4Decode(targetIndices[i],
dataBlkNum, parityBlkNum);
result[m++] = targetIndices[i];
}
}
return Arrays.copyOf(result, m);
@ -636,15 +633,13 @@ public final class ErasureCodingWorker {
StripedReader reader = stripedReaders.get(success[i]);
ByteBuffer buffer = reader.buffer;
paddingBufferToLen(buffer, toRecoverLen);
inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] =
(ByteBuffer)buffer.flip();
inputs[reader.index] = (ByteBuffer)buffer.flip();
}
if (success.length < dataBlkNum) {
for (int i = 0; i < zeroStripeBuffers.length; i++) {
ByteBuffer buffer = zeroStripeBuffers[i];
paddingBufferToLen(buffer, toRecoverLen);
int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum,
parityBlkNum);
int index = zeroStripeIndices[i];
inputs[index] = (ByteBuffer)buffer.flip();
}
}

View File

@ -211,32 +211,33 @@ public class TestDFSStripedInputStream {
}
}
RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf,
DATA_BLK_NUM, PARITY_BLK_NUM);
// Update the expected content for decoded data
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
int[] missingBlkIdx = new int[]{failedDNIdx + PARITY_BLK_NUM, 1, 2};
int[] missingBlkIdx = new int[]{failedDNIdx, 7, 8};
byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
for (int j = 0; j < DATA_BLK_NUM; j++) {
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
if (j != failedDNIdx) {
System.arraycopy(expected, posInBuf, decodeInputs[j + PARITY_BLK_NUM],
0, CELLSIZE);
System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
}
}
for (int k = 0; k < CELLSIZE; k++) {
int posInBlk = i * CELLSIZE + k;
decodeInputs[0][k] = SimulatedFSDataset.simulatedByte(
decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
}
for (int m : missingBlkIdx) {
decodeInputs[m] = null;
}
RawErasureDecoder rawDecoder = CodecUtil.createRSRawDecoder(conf,
DATA_BLK_NUM, PARITY_BLK_NUM);
rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
}
int delta = 10;
int done = 0;
// read a small delta, shouldn't trigger decode