From 4ad484883f773c702a1874fc12816ef1a4a54136 Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Tue, 26 May 2015 22:45:19 +0800 Subject: [PATCH] HADOOP-11847 Enhance raw coder allowing to read least required inputs in decoding. Contributed by Kai Zheng --- .../hadoop-common/CHANGES-HDFS-EC-7285.txt | 3 + .../rawcoder/AbstractRawErasureCoder.java | 27 ++- .../rawcoder/AbstractRawErasureDecoder.java | 75 ++++++-- .../rawcoder/AbstractRawErasureEncoder.java | 8 +- .../io/erasurecode/rawcoder/RSRawDecoder.java | 162 ++++++++++++++++- .../rawcoder/RawErasureDecoder.java | 20 ++- .../erasurecode/rawcoder/XORRawDecoder.java | 2 +- .../erasurecode/rawcoder/XORRawEncoder.java | 2 +- .../rawcoder/util/GaloisField.java | 12 +- .../hadoop/io/erasurecode/TestCoderBase.java | 39 ++--- .../coder/TestErasureCoderBase.java | 1 - .../erasurecode/rawcoder/TestRSRawCoder.java | 163 +++++++++--------- .../rawcoder/TestRawCoderBase.java | 54 ++++++ .../erasurecode/rawcoder/TestXORRawCoder.java | 49 ++---- 14 files changed, 430 insertions(+), 187 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt index c9b80d3bba5..0c244737849 100644 --- a/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-common-project/hadoop-common/CHANGES-HDFS-EC-7285.txt @@ -59,3 +59,6 @@ HADOOP-12029. Remove chunkSize from ECSchema as its not required for coders (vinayakumarb) + + HADOOP-11847. Enhance raw coder allowing to read least required inputs in decoding. + (Kai Zheng) \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java index 06ae660f367..e6a1542b403 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureCoder.java @@ -60,12 +60,13 @@ public abstract class AbstractRawErasureCoder } /** - * Ensure output buffer filled with ZERO bytes fully in chunkSize. - * @param buffer a buffer ready to write chunk size bytes + * Ensure a buffer filled with ZERO bytes from current readable/writable + * position. + * @param buffer a buffer ready to read / write certain size bytes * @return the buffer itself, with ZERO bytes written, the position and limit * are not changed after the call */ - protected ByteBuffer resetOutputBuffer(ByteBuffer buffer) { + protected ByteBuffer resetBuffer(ByteBuffer buffer) { int pos = buffer.position(); for (int i = pos; i < buffer.limit(); ++i) { buffer.put((byte) 0); @@ -77,7 +78,7 @@ public abstract class AbstractRawErasureCoder /** * Ensure the buffer (either input or output) ready to read or write with ZERO - * bytes fully in chunkSize. + * bytes fully in specified length of len. * @param buffer bytes array buffer * @return the buffer itself */ @@ -92,11 +93,16 @@ public abstract class AbstractRawErasureCoder /** * Check and ensure the buffers are of the length specified by dataLen. * @param buffers + * @param allowNull * @param dataLen */ - protected void ensureLength(ByteBuffer[] buffers, int dataLen) { + protected void ensureLength(ByteBuffer[] buffers, + boolean allowNull, int dataLen) { for (int i = 0; i < buffers.length; ++i) { - if (buffers[i].remaining() != dataLen) { + if (buffers[i] == null && !allowNull) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } else if (buffers[i] != null && buffers[i].remaining() != dataLen) { throw new HadoopIllegalArgumentException( "Invalid buffer, not of length " + dataLen); } @@ -106,11 +112,16 @@ public abstract class AbstractRawErasureCoder /** * Check and ensure the buffers are of the length specified by dataLen. * @param buffers + * @param allowNull * @param dataLen */ - protected void ensureLength(byte[][] buffers, int dataLen) { + protected void ensureLength(byte[][] buffers, + boolean allowNull, int dataLen) { for (int i = 0; i < buffers.length; ++i) { - if (buffers[i].length != dataLen) { + if (buffers[i] == null && !allowNull) { + throw new HadoopIllegalArgumentException( + "Invalid buffer found, not allowing null"); + } else if (buffers[i] != null && buffers[i].length != dataLen) { throw new HadoopIllegalArgumentException( "Invalid buffer not of length " + dataLen); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java index 0c1f80f9683..c6105b0921c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureDecoder.java @@ -21,6 +21,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.erasurecode.ECChunk; import java.nio.ByteBuffer; +import java.util.Arrays; /** * An abstract raw erasure decoder that's to be inherited by new decoders. @@ -38,14 +39,16 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder public void decode(ByteBuffer[] inputs, int[] erasedIndexes, ByteBuffer[] outputs) { checkParameters(inputs, erasedIndexes, outputs); - int dataLen = inputs[0].remaining(); + + ByteBuffer validInput = findFirstValidInput(inputs); + int dataLen = validInput.remaining(); if (dataLen == 0) { return; } - ensureLength(inputs, dataLen); - ensureLength(outputs, dataLen); + ensureLength(inputs, true, dataLen); + ensureLength(outputs, false, dataLen); - boolean usingDirectBuffer = inputs[0].isDirect(); + boolean usingDirectBuffer = validInput.isDirect(); if (usingDirectBuffer) { doDecode(inputs, erasedIndexes, outputs); return; @@ -59,8 +62,10 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder ByteBuffer buffer; for (int i = 0; i < inputs.length; ++i) { buffer = inputs[i]; - inputOffsets[i] = buffer.position(); - newInputs[i] = buffer.array(); + if (buffer != null) { + inputOffsets[i] = buffer.position(); + newInputs[i] = buffer.array(); + } } for (int i = 0; i < outputs.length; ++i) { @@ -74,7 +79,10 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder for (int i = 0; i < inputs.length; ++i) { buffer = inputs[i]; - buffer.position(inputOffsets[i] + dataLen); // dataLen bytes consumed + if (buffer != null) { + // dataLen bytes consumed + buffer.position(inputOffsets[i] + dataLen); + } } } @@ -90,12 +98,14 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder @Override public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs) { checkParameters(inputs, erasedIndexes, outputs); - int dataLen = inputs[0].length; + + byte[] validInput = findFirstValidInput(inputs); + int dataLen = validInput.length; if (dataLen == 0) { return; } - ensureLength(inputs, dataLen); - ensureLength(outputs, dataLen); + ensureLength(inputs, true, dataLen); + ensureLength(outputs, false, dataLen); int[] inputOffsets = new int[inputs.length]; // ALL ZERO int[] outputOffsets = new int[outputs.length]; // ALL ZERO @@ -148,5 +158,50 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder throw new HadoopIllegalArgumentException( "Too many erased, not recoverable"); } + + int validInputs = 0; + for (int i = 0; i < inputs.length; ++i) { + if (inputs[i] != null) { + validInputs += 1; + } + } + + if (validInputs < getNumDataUnits()) { + throw new HadoopIllegalArgumentException( + "No enough valid inputs are provided, not recoverable"); + } + } + + /** + * Get indexes into inputs array for items marked as null, either erased or + * not to read. + * @return indexes into inputs array + */ + protected int[] getErasedOrNotToReadIndexes(Object[] inputs) { + int[] invalidIndexes = new int[inputs.length]; + int idx = 0; + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] == null) { + invalidIndexes[idx++] = i; + } + } + + return Arrays.copyOf(invalidIndexes, idx); + } + + /** + * Find the valid input from all the inputs. + * @param inputs + * @return the first valid input + */ + protected static T findFirstValidInput(T[] inputs) { + for (int i = 0; i < inputs.length; i++) { + if (inputs[i] != null) { + return inputs[i]; + } + } + + throw new HadoopIllegalArgumentException( + "Invalid inputs are found, all being null"); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java index c7a136b0f4f..d1faa8c65be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/AbstractRawErasureEncoder.java @@ -41,8 +41,8 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder if (dataLen == 0) { return; } - ensureLength(inputs, dataLen); - ensureLength(outputs, dataLen); + ensureLength(inputs, false, dataLen); + ensureLength(outputs, false, dataLen); boolean usingDirectBuffer = inputs[0].isDirect(); if (usingDirectBuffer) { @@ -90,8 +90,8 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder if (dataLen == 0) { return; } - ensureLength(inputs, dataLen); - ensureLength(outputs, dataLen); + ensureLength(inputs, false, dataLen); + ensureLength(outputs, false, dataLen); int[] inputOffsets = new int[inputs.length]; // ALL ZERO int[] outputOffsets = new int[outputs.length]; // ALL ZERO diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java index e265dcef5d3..57e6957435c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RSRawDecoder.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.erasurecode.rawcoder; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.erasurecode.rawcoder.util.RSUtil; import java.nio.ByteBuffer; @@ -25,35 +26,64 @@ import java.nio.ByteBuffer; * A raw erasure decoder in RS code scheme in pure Java in case native one * isn't available in some environment. Please always use native implementations * when possible. + * + * Currently this implementation will compute and decode not to read units + * unnecessarily due to the underlying implementation limit in GF. This will be + * addressed in HADOOP-11871. */ public class RSRawDecoder extends AbstractRawErasureDecoder { // To describe and calculate the needed Vandermonde matrix private int[] errSignature; private int[] primitivePower; + /** + * We need a set of reusable buffers either for the bytes array + * decoding version or direct buffer decoding version. Normally not both. + * + * For output, in addition to the valid buffers from the caller + * passed from above, we need to provide extra buffers for the internal + * decoding implementation. For output, the caller should provide no more + * than numParityUnits but at least one buffers. And the left buffers will be + * borrowed from either bytesArrayBuffers, for the bytes array version. + * + */ + // Reused buffers for decoding with bytes arrays + private byte[][] bytesArrayBuffers = new byte[getNumParityUnits()][]; + private byte[][] adjustedByteArrayOutputsParameter = + new byte[getNumParityUnits()][]; + private int[] adjustedOutputOffsets = new int[getNumParityUnits()]; + + // Reused buffers for decoding with direct ByteBuffers + private ByteBuffer[] directBuffers = new ByteBuffer[getNumParityUnits()]; + private ByteBuffer[] adjustedDirectBufferOutputsParameter = + new ByteBuffer[getNumParityUnits()]; + public RSRawDecoder(int numDataUnits, int numParityUnits) { super(numDataUnits, numParityUnits); - assert (getNumDataUnits() + getNumParityUnits() < RSUtil.GF.getFieldSize()); + if (numDataUnits + numParityUnits >= RSUtil.GF.getFieldSize()) { + throw new HadoopIllegalArgumentException( + "Invalid numDataUnits and numParityUnits"); + } this.errSignature = new int[numParityUnits]; this.primitivePower = RSUtil.getPrimitivePower(numDataUnits, numParityUnits); } - @Override - protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, + private void doDecodeImpl(ByteBuffer[] inputs, int[] erasedIndexes, ByteBuffer[] outputs) { + ByteBuffer valid = findFirstValidInput(inputs); + int dataLen = valid.remaining(); for (int i = 0; i < erasedIndexes.length; i++) { errSignature[i] = primitivePower[erasedIndexes[i]]; - RSUtil.GF.substitute(inputs, outputs[i], primitivePower[i]); + RSUtil.GF.substitute(inputs, dataLen, outputs[i], primitivePower[i]); } RSUtil.GF.solveVandermondeSystem(errSignature, outputs, erasedIndexes.length); } - @Override - protected void doDecode(byte[][] inputs, int[] inputOffsets, + private void doDecodeImpl(byte[][] inputs, int[] inputOffsets, int dataLen, int[] erasedIndexes, byte[][] outputs, int[] outputOffsets) { for (int i = 0; i < erasedIndexes.length; i++) { @@ -63,6 +93,124 @@ public class RSRawDecoder extends AbstractRawErasureDecoder { } RSUtil.GF.solveVandermondeSystem(errSignature, outputs, outputOffsets, - erasedIndexes.length, dataLen); + erasedIndexes.length, dataLen); + } + + @Override + protected void doDecode(byte[][] inputs, int[] inputOffsets, + int dataLen, int[] erasedIndexes, + byte[][] outputs, int[] outputOffsets) { + /** + * As passed parameters are friendly to callers but not to the underlying + * implementations, so we have to adjust them before calling doDecodeImpl. + */ + + int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs); + + // Prepare for adjustedOutputsParameter + + // First reset the positions needed this time + for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) { + adjustedByteArrayOutputsParameter[i] = null; + adjustedOutputOffsets[i] = 0; + } + // Use the caller passed buffers in erasedIndexes positions + for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) { + boolean found = false; + for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { + // If this index is one requested by the caller via erasedIndexes, then + // we use the passed output buffer to avoid copying data thereafter. + if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { + found = true; + adjustedByteArrayOutputsParameter[j] = resetBuffer( + outputs[outputIdx], outputOffsets[outputIdx], dataLen); + adjustedOutputOffsets[j] = outputOffsets[outputIdx]; + outputIdx++; + } + } + if (!found) { + throw new HadoopIllegalArgumentException( + "Inputs not fully corresponding to erasedIndexes in null places"); + } + } + // Use shared buffers for other positions (not set yet) + for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) { + if (adjustedByteArrayOutputsParameter[i] == null) { + adjustedByteArrayOutputsParameter[i] = resetBuffer( + checkGetBytesArrayBuffer(bufferIdx, dataLen), 0, dataLen); + adjustedOutputOffsets[i] = 0; // Always 0 for such temp output + bufferIdx++; + } + } + + doDecodeImpl(inputs, inputOffsets, dataLen, erasedOrNotToReadIndexes, + adjustedByteArrayOutputsParameter, adjustedOutputOffsets); + } + + @Override + protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, + ByteBuffer[] outputs) { + ByteBuffer validInput = findFirstValidInput(inputs); + int dataLen = validInput.remaining(); + + /** + * As passed parameters are friendly to callers but not to the underlying + * implementations, so we have to adjust them before calling doDecodeImpl. + */ + + int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs); + + // Prepare for adjustedDirectBufferOutputsParameter + + // First reset the positions needed this time + for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) { + adjustedDirectBufferOutputsParameter[i] = null; + } + // Use the caller passed buffers in erasedIndexes positions + for (int outputIdx = 0, i = 0; i < erasedIndexes.length; i++) { + boolean found = false; + for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) { + // If this index is one requested by the caller via erasedIndexes, then + // we use the passed output buffer to avoid copying data thereafter. + if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) { + found = true; + adjustedDirectBufferOutputsParameter[j] = + resetBuffer(outputs[outputIdx++]); + } + } + if (!found) { + throw new HadoopIllegalArgumentException( + "Inputs not fully corresponding to erasedIndexes in null places"); + } + } + // Use shared buffers for other positions (not set yet) + for (int bufferIdx = 0, i = 0; i < erasedOrNotToReadIndexes.length; i++) { + if (adjustedDirectBufferOutputsParameter[i] == null) { + ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen); + buffer.position(0); + buffer.limit(dataLen); + adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer); + bufferIdx++; + } + } + + doDecodeImpl(inputs, erasedOrNotToReadIndexes, + adjustedDirectBufferOutputsParameter); + } + + private byte[] checkGetBytesArrayBuffer(int idx, int bufferLen) { + if (bytesArrayBuffers[idx] == null || + bytesArrayBuffers[idx].length < bufferLen) { + bytesArrayBuffers[idx] = new byte[bufferLen]; + } + return bytesArrayBuffers[idx]; + } + + private ByteBuffer checkGetDirectBuffer(int idx, int bufferLen) { + if (directBuffers[idx] == null || + directBuffers[idx].capacity() < bufferLen) { + directBuffers[idx] = ByteBuffer.allocateDirect(bufferLen); + } + return directBuffers[idx]; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java index 1807da729a2..ad7f32d9177 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/RawErasureDecoder.java @@ -32,6 +32,22 @@ public interface RawErasureDecoder extends RawErasureCoder { /** * Decode with inputs and erasedIndexes, generates outputs. + * How to prepare for inputs: + * 1. Create an array containing parity units + data units; + * 2. Set null in the array locations specified via erasedIndexes to indicate + * they're erased and no data are to read from; + * 3. Set null in the array locations for extra redundant items, as they're + * not necessary to read when decoding. For example in RS-6-3, if only 1 + * unit is really erased, then we have 2 extra items as redundant. They can + * be set as null to indicate no data will be used from them. + * + * For an example using RS (6, 3), assuming sources (d0, d1, d2, d3, d4, d5) + * and parities (p0, p1, p2), d2 being erased. We can and may want to use only + * 6 units like (d1, d3, d4, d5, p0, p2) to recover d2. We will have: + * inputs = [p0, null(p1), p2, null(d0), d1, null(d2), d3, d4, d5] + * erasedIndexes = [5] // index of d2 into inputs array + * outputs = [a-writable-buffer] + * * @param inputs inputs to read data from * @param erasedIndexes indexes of erased units in the inputs array * @param outputs outputs to write into for data generated according to @@ -41,7 +57,7 @@ public interface RawErasureDecoder extends RawErasureCoder { ByteBuffer[] outputs); /** - * Decode with inputs and erasedIndexes, generates outputs. + * Decode with inputs and erasedIndexes, generates outputs. More see above. * @param inputs inputs to read data from * @param erasedIndexes indexes of erased units in the inputs array * @param outputs outputs to write into for data generated according to @@ -50,7 +66,7 @@ public interface RawErasureDecoder extends RawErasureCoder { public void decode(byte[][] inputs, int[] erasedIndexes, byte[][] outputs); /** - * Decode with inputs and erasedIndexes, generates outputs. + * Decode with inputs and erasedIndexes, generates outputs. More see above. * @param inputs inputs to read data from * @param erasedIndexes indexes of erased units in the inputs array * @param outputs outputs to write into for data generated according to diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java index a09105c4ad5..e20e543422c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawDecoder.java @@ -36,7 +36,7 @@ public class XORRawDecoder extends AbstractRawErasureDecoder { protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes, ByteBuffer[] outputs) { ByteBuffer output = outputs[0]; - resetOutputBuffer(output); + resetBuffer(output); int erasedIdx = erasedIndexes[0]; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java index 894f20c7607..f4d242eefb0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/XORRawEncoder.java @@ -34,7 +34,7 @@ public class XORRawEncoder extends AbstractRawErasureEncoder { protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) { ByteBuffer output = outputs[0]; - resetOutputBuffer(output); + resetBuffer(output); // Get the first buffer's data. int iIdx, oIdx; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java index 62b22c9fce3..03683b03def 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/rawcoder/util/GaloisField.java @@ -423,7 +423,7 @@ public class GaloisField { byte[] pi = p[i]; for (iIdx = offsets[i], oIdx = offset; iIdx < offsets[i] + len; iIdx++, oIdx++) { - int pij = pi[iIdx] & 0x000000FF; + int pij = pi != null ? pi[iIdx] & 0x000000FF : 0; q[oIdx] = (byte) (q[oIdx] ^ mulTable[pij][y]); } y = mulTable[x][y]; @@ -438,13 +438,15 @@ public class GaloisField { * @param q store the return result * @param x input field */ - public void substitute(ByteBuffer[] p, ByteBuffer q, int x) { + public void substitute(ByteBuffer[] p, int len, ByteBuffer q, int x) { int y = 1, iIdx, oIdx; for (int i = 0; i < p.length; i++) { ByteBuffer pi = p[i]; - for (iIdx = pi.position(), oIdx = q.position(); - iIdx < pi.limit(); iIdx++, oIdx++) { - int pij = pi.get(iIdx) & 0x000000FF; + int pos = pi != null ? pi.position() : 0; + int limit = pi != null ? pi.limit() : len; + for (oIdx = q.position(), iIdx = pos; + iIdx < limit; iIdx++, oIdx++) { + int pij = pi != null ? pi.get(iIdx) & 0x000000FF : 0; q.put(oIdx, (byte) (q.get(oIdx) ^ mulTable[pij][y])); } y = mulTable[x][y]; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java index 3686695fb6d..9f50f335007 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/TestCoderBase.java @@ -35,7 +35,7 @@ public abstract class TestCoderBase { private Configuration conf; protected int numDataUnits; protected int numParityUnits; - protected int baseChunkSize = 16 * 1024; + protected int baseChunkSize = 513; private int chunkSize = baseChunkSize; private byte[] zeroChunkBytes; @@ -186,8 +186,9 @@ public abstract class TestCoderBase { } /** - * Erase chunks to test the recovering of them. Before erasure clone them - * first so could return them. + * Erase some data chunks to test the recovering of them. As they're erased, + * we don't need to read them and will not have the buffers at all, so just + * set them as null. * @param dataChunks * @param parityChunks * @return clone of erased chunks @@ -198,50 +199,30 @@ public abstract class TestCoderBase { erasedDataIndexes.length]; int idx = 0; - ECChunk chunk; for (int i = 0; i < erasedParityIndexes.length; i++) { - chunk = parityChunks[erasedParityIndexes[i]]; - toEraseChunks[idx ++] = cloneChunkWithData(chunk); - eraseDataFromChunk(chunk); + toEraseChunks[idx ++] = parityChunks[erasedParityIndexes[i]]; + parityChunks[erasedParityIndexes[i]] = null; } for (int i = 0; i < erasedDataIndexes.length; i++) { - chunk = dataChunks[erasedDataIndexes[i]]; - toEraseChunks[idx ++] = cloneChunkWithData(chunk); - eraseDataFromChunk(chunk); + toEraseChunks[idx ++] = dataChunks[erasedDataIndexes[i]]; + dataChunks[erasedDataIndexes[i]] = null; } return toEraseChunks; } /** - * Erase data from the specified chunks, putting ZERO bytes to the buffers. + * Erase data from the specified chunks, just setting them as null. * @param chunks */ protected void eraseDataFromChunks(ECChunk[] chunks) { for (int i = 0; i < chunks.length; i++) { - eraseDataFromChunk(chunks[i]); + chunks[i] = null; } } - /** - * Erase data from the specified chunk, putting ZERO bytes to the buffer. - * @param chunk with a buffer ready to read at the current position - */ - protected void eraseDataFromChunk(ECChunk chunk) { - ByteBuffer chunkBuffer = chunk.getBuffer(); - // Erase the data at the position, and restore the buffer ready for reading - // same many bytes but all ZERO. - int pos = chunkBuffer.position(); - int len = chunkBuffer.remaining(); - chunkBuffer.put(zeroChunkBytes, 0, len); - // Back to readable again after data erased - chunkBuffer.flip(); - chunkBuffer.position(pos); - chunkBuffer.limit(pos + len); - } - /** * Clone chunks along with copying the associated data. It respects how the * chunk buffer is allocated, direct or non-direct. It avoids affecting the diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java index f9666b6b4b9..98fa95614c9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/coder/TestErasureCoderBase.java @@ -232,7 +232,6 @@ public abstract class TestErasureCoderBase extends TestCoderBase { TestBlock[] parityBlocks) { TestBlock[] toEraseBlocks = new TestBlock[erasedDataIndexes.length + erasedParityIndexes.length]; - int idx = 0; TestBlock block; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java index 80ec04d0d4c..7b7ea42ca95 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRSRawCoder.java @@ -32,89 +32,86 @@ public class TestRSRawCoder extends TestRSRawCoderBase { } @Test - public void testCodingNoDirectBuffer_10x4_erasing_d0_p0() { + public void testCoding_6x3_erasing_all_d() { + prepare(null, 6, 3, new int[]{0, 1, 2}, new int[0], true); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_d0_d2() { + prepare(null, 6, 3, new int[] {0, 2}, new int[]{}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_d0() { + prepare(null, 6, 3, new int[]{0}, new int[0]); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_d2() { + prepare(null, 6, 3, new int[]{2}, new int[]{}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_d0_p0() { + prepare(null, 6, 3, new int[]{0}, new int[]{0}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_all_p() { + prepare(null, 6, 3, new int[0], new int[]{0, 1, 2}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_p0() { + prepare(null, 6, 3, new int[0], new int[]{0}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_p2() { + prepare(null, 6, 3, new int[0], new int[]{2}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasure_p0_p2() { + prepare(null, 6, 3, new int[0], new int[]{0, 2}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_d0_p0_p1() { + prepare(null, 6, 3, new int[]{0}, new int[]{0, 1}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCoding_6x3_erasing_d0_d2_p2() { + prepare(null, 6, 3, new int[]{0, 2}, new int[]{2}); + testCodingDoMixAndTwice(); + } + + @Test + public void testCodingNegative_6x3_erasing_d2_d4() { + prepare(null, 6, 3, new int[]{2, 4}, new int[0]); + testCodingDoMixAndTwice(); + } + + @Test + public void testCodingNegative_6x3_erasing_too_many() { + prepare(null, 6, 3, new int[]{2, 4}, new int[]{0, 1}); + testCodingWithErasingTooMany(); + } + + @Test + public void testCoding_10x4_erasing_d0_p0() { prepare(null, 10, 4, new int[] {0}, new int[] {0}); - /** - * Doing twice to test if the coders can be repeatedly reused. This matters - * as the underlying coding buffers are shared, which may have bugs. - */ - testCoding(false); - testCoding(false); - } - - @Test - public void testCodingDirectBuffer_10x4_erasing_p1() { - prepare(null, 10, 4, new int[0], new int[] {1}); - testCoding(true); - testCoding(true); - } - - @Test - public void testCodingDirectBuffer_10x4_erasing_d2() { - prepare(null, 10, 4, new int[] {2}, new int[] {}); - testCoding(true); - testCoding(true); - } - - @Test - public void testCodingDirectBuffer_10x4_erasing_d0_p0() { - prepare(null, 10, 4, new int[] {0}, new int[] {0}); - testCoding(true); - testCoding(true); - } - - @Test - public void testCodingBothBuffers_10x4_erasing_d0_p0() { - prepare(null, 10, 4, new int[] {0}, new int[] {0}); - - /** - * Doing in mixed buffer usage model to test if the coders can be repeatedly - * reused with different buffer usage model. This matters as the underlying - * coding buffers are shared, which may have bugs. - */ - testCoding(true); - testCoding(false); - testCoding(true); - testCoding(false); - } - - @Test - public void testCodingDirectBuffer_10x4_erasure_of_d2_d4_p0() { - prepare(null, 10, 4, new int[]{2, 4}, new int[]{0}); - testCoding(true); - } - - @Test - public void testCodingDirectBuffer_usingFixedData_10x4_erasure_of_d2_d4_p0() { - prepare(null, 10, 4, new int[] {2, 4}, new int[] {0}, true); - testCoding(true); - } - - @Test - public void testCodingDirectBuffer_10x4_erasing_d0_d1_p0_p1() { - prepare(null, 10, 4, new int[] {0, 1}, new int[] {0, 1}); - testCoding(true); - } - - @Test - public void testCodingNoDirectBuffer_3x3_erasing_d0_p0() { - prepare(null, 3, 3, new int[] {0}, new int[] {0}); - testCoding(false); - } - - @Test - public void testCodingDirectBuffer_3x3_erasing_d0_p0() { - prepare(null, 3, 3, new int[] {0}, new int[] {0}); - testCoding(true); - } - - @Test - public void testCodingNegative_10x4_erasing_d2_d4() { - prepare(null, 10, 4, new int[]{2, 4}, new int[0]); - - testCodingWithBadInput(true); - testCodingWithBadOutput(false); - testCodingWithBadInput(true); - testCodingWithBadOutput(false); + testCodingDoMixAndTwice(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java index cfaa2c542a5..dd5452b7a6e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestRawCoderBase.java @@ -20,6 +20,7 @@ package org.apache.hadoop.io.erasurecode.rawcoder; import org.apache.hadoop.io.erasurecode.ECChunk; import org.apache.hadoop.io.erasurecode.TestCoderBase; import org.junit.Assert; +import org.junit.Test; import java.lang.reflect.Constructor; @@ -32,6 +33,25 @@ public abstract class TestRawCoderBase extends TestCoderBase { private RawErasureEncoder encoder; private RawErasureDecoder decoder; + /** + * Doing twice to test if the coders can be repeatedly reused. This matters + * as the underlying coding buffers are shared, which may have bugs. + */ + protected void testCodingDoMixAndTwice() { + testCodingDoMixed(); + testCodingDoMixed(); + } + + /** + * Doing in mixed buffer usage model to test if the coders can be repeatedly + * reused with different buffer usage model. This matters as the underlying + * coding buffers are shared, which may have bugs. + */ + protected void testCodingDoMixed() { + testCoding(true); + testCoding(false); + } + /** * Generating source data, encoding, recovering and then verifying. * RawErasureCoder mainly uses ECChunk to pass input and output data buffers, @@ -85,6 +105,23 @@ public abstract class TestRawCoderBase extends TestCoderBase { } } + @Test + public void testCodingWithErasingTooMany() { + try { + testCoding(true); + Assert.fail("Decoding test erasing too many should fail"); + } catch (Exception e) { + // Expected + } + + try { + testCoding(false); + Assert.fail("Decoding test erasing too many should fail"); + } catch (Exception e) { + // Expected + } + } + private void performTestCoding(int chunkSize, boolean useBadInput, boolean useBadOutput) { setChunkSize(chunkSize); @@ -110,6 +147,9 @@ public abstract class TestRawCoderBase extends TestCoderBase { ECChunk[] inputChunks = prepareInputChunksForDecoding( clonedDataChunks, parityChunks); + // Remove unnecessary chunks, allowing only least required chunks to be read. + ensureOnlyLeastRequiredChunks(inputChunks); + ECChunk[] recoveredChunks = prepareOutputChunksForDecoding(); if (useBadOutput) { corruptSomeChunk(recoveredChunks); @@ -131,6 +171,20 @@ public abstract class TestRawCoderBase extends TestCoderBase { } } + private void ensureOnlyLeastRequiredChunks(ECChunk[] inputChunks) { + int leastRequiredNum = numDataUnits; + int erasedNum = erasedDataIndexes.length + erasedParityIndexes.length; + int goodNum = inputChunks.length - erasedNum; + int redundantNum = goodNum - leastRequiredNum; + + for (int i = 0; i < inputChunks.length && redundantNum > 0; i++) { + if (inputChunks[i] != null) { + inputChunks[i] = null; // Setting it null, not needing it actually + redundantNum--; + } + } + } + /** * Create the raw erasure encoder to test * @return diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java index 327174ef836..48463ad1fd3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/erasurecode/rawcoder/TestXORRawCoder.java @@ -29,58 +29,35 @@ public class TestXORRawCoder extends TestRawCoderBase { public void setup() { this.encoderClass = XORRawEncoder.class; this.decoderClass = XORRawDecoder.class; - - this.numDataUnits = 10; - this.numParityUnits = 1; } @Test - public void testCodingNoDirectBuffer_erasing_d0() { + public void testCoding_10x1_erasing_d0() { prepare(null, 10, 1, new int[] {0}, new int[0]); - - /** - * Doing twice to test if the coders can be repeatedly reused. This matters - * as the underlying coding buffers are shared, which may have bugs. - */ - testCoding(false); - testCoding(false); + testCodingDoMixAndTwice(); } @Test - public void testCodingDirectBuffer_erasing_p0() { + public void testCoding_10x1_erasing_p0() { prepare(null, 10, 1, new int[0], new int[] {0}); - - testCoding(true); - testCoding(true); + testCodingDoMixAndTwice(); } @Test - public void testCodingDirectBuffer_erasing_d0() { - prepare(null, 10, 1, new int[] {0}, new int[0]); - - testCoding(true); - testCoding(true); - } - - @Test - public void testCodingBothBuffers_erasing_d5() { + public void testCoding_10x1_erasing_d5() { prepare(null, 10, 1, new int[]{5}, new int[0]); - - /** - * Doing in mixed buffer usage model to test if the coders can be repeatedly - * reused with different buffer usage model. This matters as the underlying - * coding buffers are shared, which may have bugs. - */ - testCoding(true); - testCoding(false); - testCoding(true); - testCoding(false); + testCodingDoMixAndTwice(); } @Test - public void testCodingNegative_erasing_d5() { - prepare(null, 10, 1, new int[]{5}, new int[0]); + public void testCodingNegative_10x1_erasing_too_many() { + prepare(null, 10, 1, new int[]{2}, new int[]{0}); + testCodingWithErasingTooMany(); + } + @Test + public void testCodingNegative_10x1_erasing_d5() { + prepare(null, 10, 1, new int[]{5}, new int[0]); testCodingWithBadInput(true); testCodingWithBadOutput(false); testCodingWithBadInput(true);