HADOOP-12327. Initialize output buffers with ZERO bytes in erasure coder. Contributed by Kai Zheng.

This commit is contained in:
Walter Su 2015-10-29 15:02:06 +08:00
parent 656c8f9527
commit 5eca6dece6
9 changed files with 49 additions and 33 deletions

View File

@ -603,6 +603,9 @@ Trunk (Unreleased)
HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng)
HADOOP-12327. Initialize output buffers with ZERO bytes in erasure coder.
(Kai Zheng via waltersu4549)
Release 2.8.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
public abstract class AbstractRawErasureCoder
extends Configured implements RawErasureCoder {
private static byte[] emptyChunk = new byte[4096];
private final int numDataUnits;
private final int numParityUnits;
private final int numAllUnits;
@ -42,6 +43,23 @@ public abstract class AbstractRawErasureCoder
this.numAllUnits = numDataUnits + numParityUnits;
}
/**
* Make sure to return an empty chunk buffer for the desired length.
* @param leastLength
* @return empty chunk of zero bytes
*/
protected static byte[] getEmptyChunk(int leastLength) {
if (emptyChunk.length >= leastLength) {
return emptyChunk; // In most time
}
synchronized (AbstractRawErasureCoder.class) {
emptyChunk = new byte[leastLength];
}
return emptyChunk;
}
@Override
public int getNumDataUnits() {
return numDataUnits;
@ -73,11 +91,9 @@ public abstract class AbstractRawErasureCoder
* @return the buffer itself, with ZERO bytes written, the position and limit
* are not changed after the call
*/
protected ByteBuffer resetBuffer(ByteBuffer buffer) {
protected ByteBuffer resetBuffer(ByteBuffer buffer, int len) {
int pos = buffer.position();
for (int i = pos; i < buffer.limit(); ++i) {
buffer.put((byte) 0);
}
buffer.put(getEmptyChunk(len), 0, len);
buffer.position(pos);
return buffer;
@ -90,9 +106,8 @@ public abstract class AbstractRawErasureCoder
* @return the buffer itself
*/
protected byte[] resetBuffer(byte[] buffer, int offset, int len) {
for (int i = offset; i < len; ++i) {
buffer[i] = (byte) 0;
}
byte[] empty = getEmptyChunk(len);
System.arraycopy(empty, 0, buffer, offset, len);
return buffer;
}
@ -104,9 +119,10 @@ public abstract class AbstractRawErasureCoder
* @param allowNull whether to allow any element to be null or not
* @param dataLen the length of data available in the buffer to ensure with
* @param isDirectBuffer is direct buffer or not to ensure with
* @param isOutputs is output buffer or not
*/
protected void ensureLengthAndType(ByteBuffer[] buffers, boolean allowNull,
int dataLen, boolean isDirectBuffer) {
protected void checkParameterBuffers(ByteBuffer[] buffers, boolean
allowNull, int dataLen, boolean isDirectBuffer, boolean isOutputs) {
for (ByteBuffer buffer : buffers) {
if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException(
@ -120,18 +136,23 @@ public abstract class AbstractRawErasureCoder
throw new HadoopIllegalArgumentException(
"Invalid buffer, isDirect should be " + isDirectBuffer);
}
if (isOutputs) {
resetBuffer(buffer, dataLen);
}
}
}
}
/**
* Check and ensure the buffers are of the length specified by dataLen.
* Check and ensure the buffers are of the length specified by dataLen. If is
* output buffers, ensure they will be ZEROed.
* @param buffers the buffers to check
* @param allowNull whether to allow any element to be null or not
* @param dataLen the length of data available in the buffer to ensure with
* @param isOutputs is output buffer or not
*/
protected void ensureLength(byte[][] buffers,
boolean allowNull, int dataLen) {
protected void checkParameterBuffers(byte[][] buffers, boolean allowNull,
int dataLen, boolean isOutputs) {
for (byte[] buffer : buffers) {
if (buffer == null && !allowNull) {
throw new HadoopIllegalArgumentException(
@ -139,6 +160,8 @@ public abstract class AbstractRawErasureCoder
} else if (buffer != null && buffer.length != dataLen) {
throw new HadoopIllegalArgumentException(
"Invalid buffer not of length " + dataLen);
} else if (isOutputs) {
resetBuffer(buffer, 0, dataLen);
}
}
}

View File

@ -48,8 +48,8 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
if (dataLen == 0) {
return;
}
ensureLengthAndType(inputs, true, dataLen, usingDirectBuffer);
ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer);
checkParameterBuffers(inputs, true, dataLen, usingDirectBuffer, false);
checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
if (usingDirectBuffer) {
doDecode(inputs, erasedIndexes, outputs);
@ -106,8 +106,8 @@ public abstract class AbstractRawErasureDecoder extends AbstractRawErasureCoder
if (dataLen == 0) {
return;
}
ensureLength(inputs, true, dataLen);
ensureLength(outputs, false, dataLen);
checkParameterBuffers(inputs, true, dataLen, false);
checkParameterBuffers(outputs, false, dataLen, true);
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
int[] outputOffsets = new int[outputs.length]; // ALL ZERO

View File

@ -45,8 +45,8 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
if (dataLen == 0) {
return;
}
ensureLengthAndType(inputs, false, dataLen, usingDirectBuffer);
ensureLengthAndType(outputs, false, dataLen, usingDirectBuffer);
checkParameterBuffers(inputs, false, dataLen, usingDirectBuffer, false);
checkParameterBuffers(outputs, false, dataLen, usingDirectBuffer, true);
if (usingDirectBuffer) {
doEncode(inputs, outputs);
@ -93,8 +93,8 @@ public abstract class AbstractRawErasureEncoder extends AbstractRawErasureCoder
if (dataLen == 0) {
return;
}
ensureLength(inputs, false, dataLen);
ensureLength(outputs, false, dataLen);
checkParameterBuffers(inputs, false, dataLen, false);
checkParameterBuffers(outputs, false, dataLen, true);
int[] inputOffsets = new int[inputs.length]; // ALL ZERO
int[] outputOffsets = new int[outputs.length]; // ALL ZERO

View File

@ -206,7 +206,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
found = true;
adjustedDirectBufferOutputsParameter[j] =
resetBuffer(outputs[outputIdx++]);
resetBuffer(outputs[outputIdx++], dataLen);
}
}
if (!found) {
@ -220,7 +220,7 @@ public class RSRawDecoder extends AbstractRawErasureDecoder {
ByteBuffer buffer = checkGetDirectBuffer(bufferIdx, dataLen);
buffer.position(0);
buffer.limit(dataLen);
adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer);
adjustedDirectBufferOutputsParameter[i] = resetBuffer(buffer, dataLen);
bufferIdx++;
}
}

View File

@ -39,7 +39,6 @@ public class XORRawDecoder extends AbstractRawErasureDecoder {
protected void doDecode(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) {
ByteBuffer output = outputs[0];
resetBuffer(output);
int erasedIdx = erasedIndexes[0];

View File

@ -37,7 +37,6 @@ public class XORRawEncoder extends AbstractRawErasureEncoder {
protected void doEncode(ByteBuffer[] inputs, ByteBuffer[] outputs) {
ByteBuffer output = outputs[0];
resetBuffer(output);
// Get the first buffer's data.
int iIdx, oIdx;

View File

@ -221,9 +221,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
private void clear() {
for (int i = 0; i< numAllBlocks; i++) {
buffers[i].clear();
if (i >= numDataBlocks) {
Arrays.fill(buffers[i].array(), (byte) 0);
}
}
}

View File

@ -907,15 +907,10 @@ public final class ErasureCodingWorker {
for (int i = 0; i < targetBuffers.length; i++) {
if (targetBuffers[i] != null) {
cleanBuffer(targetBuffers[i]);
targetBuffers[i].clear();
}
}
}
private ByteBuffer cleanBuffer(ByteBuffer buffer) {
Arrays.fill(buffer.array(), (byte) 0);
return (ByteBuffer)buffer.clear();
}
// send an empty packet to mark the end of the block
private void endTargetBlocks(boolean[] targetsStatus) {