HADOOP-12685. Input buffer position after encode/decode not consistent between different kinds of buffers. Contributed by Rui Li.

Change-Id: I713c7b4e3cfae70c04b7e4b292ab53eae348d8d9
This commit is contained in:
Zhe Zhang 2016-01-05 16:31:52 -08:00
parent 355c0ce723
commit c52b407cbf
7 changed files with 114 additions and 55 deletions

View File

@ -638,6 +638,9 @@ Trunk (Unreleased)
HADOOP-12544. Erasure Coding: create dummy raw coder to isolate performance HADOOP-12544. Erasure Coding: create dummy raw coder to isolate performance
issues in testing. (Rui Li via zhz) issues in testing. (Rui Li via zhz)
HADOOP-12685. Input buffer position after encode/decode not consistent
between different kinds of buffers. (Rui Li via zhz)
Release 2.9.0 - UNRELEASED Release 2.9.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -51,39 +51,44 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false); checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false);
checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
if (usingDirectBuffer) { int[] inputPositions = new int[inputs.length];
doDecode(inputs, erasedIndexes, outputs); for (int i = 0; i < inputPositions.length; i++) {
return; if (inputs[i] != null) {
} inputPositions[i] = inputs[i].position();
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
} }
} }
for (int i = 0; i < outputs.length; ++i) { if (usingDirectBuffer) {
buffer = outputs[i]; doDecode(inputs, erasedIndexes, outputs);
outputOffsets[i] = buffer.arrayOffset() + buffer.position(); } else {
newOutputs[i] = buffer.array(); int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
doDecode(newInputs, inputOffsets, dataLen,
erasedIndexes, newOutputs, outputOffsets);
} }
doDecode(newInputs, inputOffsets, dataLen, for (int i = 0; i < inputs.length; i++) {
erasedIndexes, newOutputs, outputOffsets); if (inputs[i] != null) {
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
if (buffer != null) {
// dataLen bytes consumed // dataLen bytes consumed
buffer.position(buffer.position() + dataLen); inputs[i].position(inputPositions[i] + dataLen);
} }
} }
} }

View File

@ -48,34 +48,42 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false); checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false);
checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true); checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
int[] inputPositions = new int[inputs.length];
for (int i = 0; i < inputPositions.length; i++) {
if (inputs[i] != null) {
inputPositions[i] = inputs[i].position();
}
}
if (usingDirectBuffer) { if (usingDirectBuffer) {
doEncode(inputs, outputs); doEncode(inputs, outputs);
return; } else {
int[] inputOffsets = new int[inputs.length];
int[] outputOffsets = new int[outputs.length];
byte[][] newInputs = new byte[inputs.length][];
byte[][] newOutputs = new byte[outputs.length][];
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets);
} }
int[] inputOffsets = new int[inputs.length]; for (int i = 0; i < inputs.length; i++) {
int[] outputOffsets = new int[outputs.length]; if (inputs[i] != null) {
byte[][] newInputs = new byte[inputs.length][]; // dataLen bytes consumed
byte[][] newOutputs = new byte[outputs.length][]; inputs[i].position(inputPositions[i] + dataLen);
}
ByteBuffer buffer;
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
inputOffsets[i] = buffer.arrayOffset() + buffer.position();
newInputs[i] = buffer.array();
}
for (int i = 0; i < outputs.length; ++i) {
buffer = outputs[i];
outputOffsets[i] = buffer.arrayOffset() + buffer.position();
newOutputs[i] = buffer.array();
}
doEncode(newInputs, inputOffsets, dataLen, newOutputs, outputOffsets);
for (int i = 0; i < inputs.length; ++i) {
buffer = inputs[i];
buffer.position(buffer.position() + dataLen); // dataLen bytes consumed
} }
} }

View File

@ -56,9 +56,10 @@ public interface RawErasureDecoder extends RawErasureCoder {
* *
* If the coder option ALLOW_CHANGE_INPUTS is set true (false by default), the * If the coder option ALLOW_CHANGE_INPUTS is set true (false by default), the
* content of input buffers may change after the call, subject to concrete * content of input buffers may change after the call, subject to concrete
* implementation. Anyway the positions of input buffers will move forward. * implementation.
* *
* @param inputs input buffers to read data from * @param inputs input buffers to read data from. The buffers' remaining will
* be 0 after decoding
* @param erasedIndexes indexes of erased units in the inputs array * @param erasedIndexes indexes of erased units in the inputs array
* @param outputs output buffers to put decoded data into according to * @param outputs output buffers to put decoded data into according to
* erasedIndexes, ready for read after the call * erasedIndexes, ready for read after the call

View File

@ -42,8 +42,9 @@ public interface RawErasureEncoder extends RawErasureCoder {
* content of input buffers may change after the call, subject to concrete * content of input buffers may change after the call, subject to concrete
* implementation. Anyway the positions of input buffers will move forward. * implementation. Anyway the positions of input buffers will move forward.
* *
* @param inputs input buffers to read data from * @param inputs input buffers to read data from. The buffers' remaining will
* @param outputs output buffers to put the encoded data into, read to read * be 0 after encoding
* @param outputs output buffers to put the encoded data into, ready to read
* after the call * after the call
*/ */
void encode(ByteBuffer[] inputs, ByteBuffer[] outputs); void encode(ByteBuffer[] inputs, ByteBuffer[] outputs);

View File

@ -115,4 +115,11 @@ public class TestRSRawCoder extends TestRSRawCoderBase {
prepare(null, 10, 4, new int[] {0}, new int[] {0}); prepare(null, 10, 4, new int[] {0}, new int[] {0});
testCodingDoMixAndTwice(); testCodingDoMixAndTwice();
} }
@Test
public void testCodingInputBufferPosition() {
prepare(null, 6, 3, new int[]{0}, new int[]{0});
testInputPosition(false);
testInputPosition(true);
}
} }

View File

@ -254,4 +254,38 @@ public abstract class TestRawCoderBase extends TestCoderBase {
decoder.setConf(getConf()); decoder.setConf(getConf());
return decoder; return decoder;
} }
/**
* Tests that the input buffer's position is moved to the end after
* encode/decode.
*/
protected void testInputPosition(boolean usingDirectBuffer) {
this.usingDirectBuffer = usingDirectBuffer;
prepareCoders();
prepareBufferAllocator(false);
// verify encode
ECChunk[] dataChunks = prepareDataChunksForEncoding();
ECChunk[] parityChunks = prepareParityChunksForEncoding();
ECChunk[] clonedDataChunks = cloneChunksWithData(dataChunks);
encoder.encode(dataChunks, parityChunks);
verifyBufferPositionAtEnd(dataChunks);
// verify decode
backupAndEraseChunks(clonedDataChunks, parityChunks);
ECChunk[] inputChunks = prepareInputChunksForDecoding(
clonedDataChunks, parityChunks);
ensureOnlyLeastRequiredChunks(inputChunks);
ECChunk[] recoveredChunks = prepareOutputChunksForDecoding();
decoder.decode(inputChunks, getErasedIndexesForDecoding(), recoveredChunks);
verifyBufferPositionAtEnd(inputChunks);
}
private void verifyBufferPositionAtEnd(ECChunk[] inputChunks) {
for (ECChunk chunk : inputChunks) {
if (chunk != null) {
Assert.assertEquals(0, chunk.getBuffer().remaining());
}
}
}
} }