HADOOP-11566. Add tests and fix for erasure coders to recover erased parity units. Contributed by Kai Zheng.

This commit is contained in:
Zhe Zhang 2015-05-18 10:13:03 -07:00 committed by Zhe Zhang
parent 09c3a375ba
commit b64f6745a4
10 changed files with 134 additions and 77 deletions

View File

@ -48,3 +48,6 @@
HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng via Zhe Zhang)
HADOOP-11920. Refactor some codes for erasure coders. (Kai Zheng via Zhe Zhang)
HADOOP-11566. Add tests and fix for erasure coders to recover erased parity
units. (Kai Zheng via Zhe Zhang)

View File

@ -58,8 +58,14 @@ public class ECChunk {
public static ByteBuffer[] toBuffers(ECChunk[] chunks) {
ByteBuffer[] buffers = new ByteBuffer[chunks.length];
ECChunk chunk;
for (int i = 0; i < chunks.length; i++) {
buffers[i] = chunks[i].getBuffer();
chunk = chunks[i];
if (chunk == null) {
buffers[i] = null;
} else {
buffers[i] = chunk.getBuffer();
}
}
return buffers;
@ -75,8 +81,15 @@ public class ECChunk {
byte[][] bytesArr = new byte[chunks.length][];
ByteBuffer buffer;
ECChunk chunk;
for (int i = 0; i < chunks.length; i++) {
buffer = chunks[i].getBuffer();
chunk = chunks[i];
if (chunk == null) {
bytesArr[i] = null;
continue;
}
buffer = chunk.getBuffer();
if (buffer.hasArray()) {
bytesArr[i] = buffer.array();
} else {

View File

@ -60,16 +60,21 @@ public abstract class AbstractErasureDecoder extends AbstractErasureCoder {
}
/**
* Which blocks were erased ? We only care data blocks here. Sub-classes can
* override this behavior.
* Which blocks were erased ?
* @param blockGroup
* @return output blocks to recover
*/
protected ECBlock[] getOutputBlocks(ECBlockGroup blockGroup) {
ECBlock[] outputBlocks = new ECBlock[
getNumErasedBlocks(blockGroup.getDataBlocks())];
ECBlock[] outputBlocks = new ECBlock[getNumErasedBlocks(blockGroup)];
int idx = 0;
for (int i = 0; i < getNumParityUnits(); i++) {
if (blockGroup.getParityBlocks()[i].isErased()) {
outputBlocks[idx++] = blockGroup.getParityBlocks()[i];
}
}
for (int i = 0; i < getNumDataUnits(); i++) {
if (blockGroup.getDataBlocks()[i].isErased()) {
outputBlocks[idx++] = blockGroup.getDataBlocks()[i];

View File

@ -37,10 +37,12 @@ public abstract class TestCoderBase {
protected int numParityUnits;
protected int chunkSize = 16 * 1024;
// Indexes of erased data units. Will also support test of erasing
// parity units
// Indexes of erased data units.
protected int[] erasedDataIndexes = new int[] {0};
// Indexes of erased parity units.
protected int[] erasedParityIndexes = new int[] {0};
// Data buffers are either direct or on-heap, for performance the two cases
// may go to different coding implementations.
protected boolean usingDirectBuffer = true;
@ -52,12 +54,15 @@ public abstract class TestCoderBase {
* @param erasedDataIndexes
*/
protected void prepare(Configuration conf, int numDataUnits,
int numParityUnits, int[] erasedDataIndexes) {
int numParityUnits, int[] erasedDataIndexes,
int[] erasedParityIndexes) {
this.conf = conf;
this.numDataUnits = numDataUnits;
this.numParityUnits = numParityUnits;
this.erasedDataIndexes = erasedDataIndexes != null ?
erasedDataIndexes : new int[] {0};
this.erasedParityIndexes = erasedParityIndexes != null ?
erasedParityIndexes : new int[] {0};
}
/**
@ -87,10 +92,15 @@ public abstract class TestCoderBase {
* @return erased indexes altogether
*/
protected int[] getErasedIndexesForDecoding() {
int[] erasedIndexesForDecoding = new int[erasedDataIndexes.length];
int[] erasedIndexesForDecoding =
new int[erasedParityIndexes.length + erasedDataIndexes.length];
int idx = 0;
for (int i = 0; i < erasedParityIndexes.length; i++) {
erasedIndexesForDecoding[idx ++] = erasedParityIndexes[i];
}
for (int i = 0; i < erasedDataIndexes.length; i++) {
erasedIndexesForDecoding[idx ++] = erasedDataIndexes[i] + numParityUnits;
}
@ -123,15 +133,25 @@ public abstract class TestCoderBase {
* Erase chunks to test the recovering of them. Before erasure clone them
* first so could return them.
* @param dataChunks
* @param parityChunks
* @return clone of erased chunks
*/
protected ECChunk[] backupAndEraseChunks(ECChunk[] dataChunks) {
ECChunk[] toEraseChunks = new ECChunk[erasedDataIndexes.length];
protected ECChunk[] backupAndEraseChunks(ECChunk[] dataChunks,
ECChunk[] parityChunks) {
ECChunk[] toEraseChunks = new ECChunk[erasedParityIndexes.length +
erasedDataIndexes.length];
int idx = 0;
ECChunk chunk;
for (int i = 0; i < erasedParityIndexes.length; i++) {
chunk = parityChunks[erasedParityIndexes[i]];
toEraseChunks[idx ++] = cloneChunkWithData(chunk);
eraseDataFromChunk(chunk);
}
for (int i = 0; i < erasedDataIndexes.length; i++) {
ECChunk chunk = dataChunks[erasedDataIndexes[i]];
chunk = dataChunks[erasedDataIndexes[i]];
toEraseChunks[idx ++] = cloneChunkWithData(chunk);
eraseDataFromChunk(chunk);
}
@ -273,7 +293,8 @@ public abstract class TestCoderBase {
* @return
*/
protected ECChunk[] prepareOutputChunksForDecoding() {
ECChunk[] chunks = new ECChunk[erasedDataIndexes.length];
ECChunk[] chunks = new ECChunk[erasedDataIndexes.length +
erasedParityIndexes.length];
for (int i = 0; i < chunks.length; i++) {
chunks[i] = allocateOutputChunk();

View File

@ -63,13 +63,15 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
ECBlockGroup blockGroup = prepareBlockGroupForEncoding();
// Backup all the source chunks for later recovering because some coders
// may affect the source data.
TestBlock[] clonedDataBlocks = cloneBlocksWithData((TestBlock[]) blockGroup.getDataBlocks());
TestBlock[] clonedDataBlocks =
cloneBlocksWithData((TestBlock[]) blockGroup.getDataBlocks());
TestBlock[] parityBlocks = (TestBlock[]) blockGroup.getParityBlocks();
ErasureCodingStep codingStep;
codingStep = encoder.calculateCoding(blockGroup);
performCodingStep(codingStep);
// Erase specified sources but return copies of them for later comparing
TestBlock[] backupBlocks = backupAndEraseBlocks(clonedDataBlocks);
TestBlock[] backupBlocks = backupAndEraseBlocks(clonedDataBlocks, parityBlocks);
// Decode
blockGroup = new ECBlockGroup(clonedDataBlocks, blockGroup.getParityBlocks());
@ -207,13 +209,22 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
* @param dataBlocks
* @return clone of erased dataBlocks
*/
protected TestBlock[] backupAndEraseBlocks(TestBlock[] dataBlocks) {
TestBlock[] toEraseBlocks = new TestBlock[erasedDataIndexes.length];
protected TestBlock[] backupAndEraseBlocks(TestBlock[] dataBlocks,
TestBlock[] parityBlocks) {
TestBlock[] toEraseBlocks = new TestBlock[erasedDataIndexes.length +
erasedParityIndexes.length];
int idx = 0;
TestBlock block;
for (int i = 0; i < erasedParityIndexes.length; i++) {
block = parityBlocks[erasedParityIndexes[i]];
toEraseBlocks[idx ++] = cloneBlockWithData(block);
eraseDataFromBlock(block);
}
for (int i = 0; i < erasedDataIndexes.length; i++) {
TestBlock block = dataBlocks[erasedDataIndexes[i]];
block = dataBlocks[erasedDataIndexes[i]];
toEraseBlocks[idx ++] = cloneBlockWithData(block);
eraseDataFromBlock(block);
}
@ -221,22 +232,6 @@ public abstract class TestErasureCoderBase extends TestCoderBase {
return toEraseBlocks;
}
/**
* Copy those data blocks that's to be erased for later comparing and
* verifying.
* @param dataBlocks
* @return
*/
protected TestBlock[] copyDataBlocksToErase(TestBlock[] dataBlocks) {
TestBlock[] copiedBlocks = new TestBlock[erasedDataIndexes.length];
for (int i = 0; i < erasedDataIndexes.length; ++i) {
copiedBlocks[i] = cloneBlockWithData(dataBlocks[erasedDataIndexes[i]]);
}
return copiedBlocks;
}
/**
* Allocate an output block. Note the chunk buffer will be allocated by the
* up caller when performing the coding step.

View File

@ -40,8 +40,8 @@ public class TestRSErasureCoder extends TestErasureCoderBase {
}
@Test
public void testCodingNoDirectBuffer_10x4_erasing_d0() {
prepare(null, 10, 4, new int[] {0});
public void testCodingNoDirectBuffer_10x4_erasing_d0_p0() {
prepare(null, 10, 4, new int[] {0}, new int[] {0});
/**
* Doing twice to test if the coders can be repeatedly reused. This matters
* as the underlying coding buffers are shared, which may have bugs.
@ -53,34 +53,41 @@ public class TestRSErasureCoder extends TestErasureCoderBase {
@Test
public void testCodingDirectBufferWithConf_10x4_erasing_d0() {
/**
* This tests if the two configuration items work or not.
* This tests if the configuration items work or not.
*/
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY, RSRawErasureCoderFactory.class.getCanonicalName());
prepare(conf, 10, 4, new int[]{0});
conf.set(CommonConfigurationKeys.IO_ERASURECODE_CODEC_RS_RAWCODER_KEY,
RSRawErasureCoderFactory.class.getCanonicalName());
prepare(conf, 10, 4, new int[]{0}, new int[0]);
testCoding(true);
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_p1() {
prepare(null, 10, 4, new int[]{}, new int[]{1});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_d2() {
prepare(null, 10, 4, new int[] {2});
prepare(null, 10, 4, new int[] {2}, new int[] {});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_d0() {
prepare(null, 10, 4, new int[] {0});
public void testCodingDirectBuffer_10x4_erasing_d0_p0() {
prepare(null, 10, 4, new int[] {0}, new int[] {0});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingBothBuffers_10x4_erasing_d0() {
prepare(null, 10, 4, new int[] {0});
public void testCodingBothBuffers_10x4_erasing_d0_p0() {
prepare(null, 10, 4, new int[] {0}, new int[] {0});
/**
* Doing in mixed buffer usage model to test if the coders can be repeatedly
@ -94,27 +101,26 @@ public class TestRSErasureCoder extends TestErasureCoderBase {
}
@Test
public void testCodingDirectBuffer_10x4_erasure_of_d2_d4() {
prepare(null, 10, 4, new int[] {2, 4});
public void testCodingDirectBuffer_10x4_erasure_of_d2_d4_p0() {
prepare(null, 10, 4, new int[] {2, 4}, new int[] {0});
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_d0_d1() {
prepare(null, 10, 4, new int[] {0, 1});
public void testCodingDirectBuffer_10x4_erasing_d0_d1_p0_p1() {
prepare(null, 10, 4, new int[] {0, 1}, new int[] {0, 1});
testCoding(true);
}
@Test
public void testCodingNoDirectBuffer_3x3_erasing_d0() {
prepare(null, 3, 3, new int[] {0});
public void testCodingNoDirectBuffer_3x3_erasing_d0_p0() {
prepare(null, 3, 3, new int[] {0}, new int[] {0});
testCoding(false);
}
@Test
public void testCodingDirectBuffer_3x3_erasing_d0() {
prepare(null, 3, 3, new int[] {0});
public void testCodingDirectBuffer_3x3_erasing_d0_p0() {
prepare(null, 3, 3, new int[] {0}, new int[] {0});
testCoding(true);
}
}

View File

@ -36,8 +36,8 @@ public class TestXORCoder extends TestErasureCoderBase {
}
@Test
public void testCodingNoDirectBuffer_erasing_d0() {
prepare(null, 10, 1, new int[] {0});
public void testCodingNoDirectBuffer_erasing_p0() {
prepare(null, 10, 1, new int[0], new int[] {0});
/**
* Doing twice to test if the coders can be repeatedly reused. This matters
@ -49,7 +49,7 @@ public class TestXORCoder extends TestErasureCoderBase {
@Test
public void testCodingBothBuffers_erasing_d5() {
prepare(null, 10, 1, new int[]{5});
prepare(null, 10, 1, new int[]{5}, new int[0]);
/**
* Doing in mixed buffer usage model to test if the coders can be repeatedly

View File

@ -32,8 +32,8 @@ public class TestRSRawCoder extends TestRSRawCoderBase {
}
@Test
public void testCodingNoDirectBuffer_10x4_erasing_d0() {
prepare(null, 10, 4, new int[] {0});
public void testCodingNoDirectBuffer_10x4_erasing_d0_p0() {
prepare(null, 10, 4, new int[] {0}, new int[] {0});
/**
* Doing twice to test if the coders can be repeatedly reused. This matters
* as the underlying coding buffers are shared, which may have bugs.
@ -42,23 +42,30 @@ public class TestRSRawCoder extends TestRSRawCoderBase {
testCoding(false);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_p1() {
prepare(null, 10, 4, new int[] {}, new int[] {1});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_d2() {
prepare(null, 10, 4, new int[] {2});
prepare(null, 10, 4, new int[] {2}, new int[] {});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_d0() {
prepare(null, 10, 4, new int[] {0});
public void testCodingDirectBuffer_10x4_erasing_d0_p0() {
prepare(null, 10, 4, new int[] {0}, new int[] {0});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingBothBuffers_10x4_erasing_d0() {
prepare(null, 10, 4, new int[] {0});
public void testCodingBothBuffers_10x4_erasing_d0_p0() {
prepare(null, 10, 4, new int[] {0}, new int[] {0});
/**
* Doing in mixed buffer usage model to test if the coders can be repeatedly
@ -72,26 +79,26 @@ public class TestRSRawCoder extends TestRSRawCoderBase {
}
@Test
public void testCodingDirectBuffer_10x4_erasure_of_d2_d4() {
prepare(null, 10, 4, new int[] {2, 4});
public void testCodingDirectBuffer_10x4_erasure_of_d2_d4_p0() {
prepare(null, 10, 4, new int[] {2, 4}, new int[] {0});
testCoding(true);
}
@Test
public void testCodingDirectBuffer_10x4_erasing_d0_d1() {
prepare(null, 10, 4, new int[] {0, 1});
public void testCodingDirectBuffer_10x4_erasing_d0_d1_p0_p1() {
prepare(null, 10, 4, new int[] {0, 1}, new int[] {0, 1});
testCoding(true);
}
@Test
public void testCodingNoDirectBuffer_3x3_erasing_d0() {
prepare(null, 3, 3, new int[] {0});
public void testCodingNoDirectBuffer_3x3_erasing_d0_p0() {
prepare(null, 3, 3, new int[] {0}, new int[] {0});
testCoding(false);
}
@Test
public void testCodingDirectBuffer_3x3_erasing_d0() {
prepare(null, 3, 3, new int[] {0});
public void testCodingDirectBuffer_3x3_erasing_d0_p0() {
prepare(null, 3, 3, new int[] {0}, new int[] {0});
testCoding(true);
}
}

View File

@ -52,7 +52,7 @@ public abstract class TestRawCoderBase extends TestCoderBase {
encoder.encode(dataChunks, parityChunks);
// Backup and erase some chunks
ECChunk[] backupChunks = backupAndEraseChunks(clonedDataChunks);
ECChunk[] backupChunks = backupAndEraseChunks(clonedDataChunks, parityChunks);
// Decode
ECChunk[] inputChunks = prepareInputChunksForDecoding(

View File

@ -36,7 +36,7 @@ public class TestXORRawCoder extends TestRawCoderBase {
@Test
public void testCodingNoDirectBuffer_erasing_d0() {
prepare(null, 10, 1, new int[] {0});
prepare(null, 10, 1, new int[] {0}, new int[0]);
/**
* Doing twice to test if the coders can be repeatedly reused. This matters
@ -46,9 +46,16 @@ public class TestXORRawCoder extends TestRawCoderBase {
testCoding(false);
}
@Test
public void testCodingDirectBuffer_erasing_p0() {
prepare(null, 10, 1, new int[0], new int[] {0});
testCoding(true);
testCoding(true);
}
@Test
public void testCodingBothBuffers_erasing_d5() {
prepare(null, 10, 1, new int[]{5});
prepare(null, 10, 1, new int[]{5}, new int[0]);
/**
* Doing in mixed buffer usage model to test if the coders can be repeatedly